Skip to content

Commit 8f413be

Browse files
craig[bot]andyyang890KeithChaa-joshi
committed
153626: jobs: add invalid job ID checks to storage types r=dt a=andyyang890 This patch aims to prevent a class of bugs resulting from jobs unintentionally passing zero job IDs when accessing the job storage tables, which could have unintended consequences. One way this could conceivably happen is as a result of not initializing a field or var that stores a job ID. Epic: None Release note: None 153809: changefeedccl: implement INCLUDE TABLES syntax in DB-level feeds r=aerfrei,asg0451 a=KeithCh The syntax for INCLUDE TABLES will be CREATE CHANGEFEED FOR DATABASE FOO EXCLUDE TABLES fizz,buzz; Resolves #147420 Release note: None 153852: util/metric: introduce HighCardinalityHistogram in aggregated metrics r=aa-joshi a=aa-joshi This patch introduces `HighCardinalityHistogram` metric which is similar to the `HighCardinalityCounter` introduced in #153568. It relies on unordered cache with LRU eviction as child storage. The parent values represents the aggregation of all child metric values. The child metrics values are only exported and aggregated values is persisted in CRDB. It relies on LabelSliceCache to efficiently store label values at registry. The child metric eviction policy is combination of max cache size of 5000 and minimum retention time of 20 seconds. This guarantees that we would see the child metric values at least in one scrape with default interval of 10 seconds before getting evicted due to cache size. The child eviction won't impact the parent value. Epic: CRDB-53398 Part of: CRDB-53833 Release note: None Co-authored-by: Andy Yang <[email protected]> Co-authored-by: Keith Chow <[email protected]> Co-authored-by: Akshay Joshi <[email protected]>
4 parents 1905ae4 + 2469c6a + 4f28164 + 3b5f219 commit 8f413be

14 files changed

+1120
-5
lines changed

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1963,6 +1963,7 @@ opt_changefeed_sink ::=
19631963

19641964
db_level_changefeed_filter_option ::=
19651965
'EXCLUDE' 'TABLES' table_name_list
1966+
| 'INCLUDE' 'TABLES' table_name_list
19661967
|
19671968

19681969
target_list ::=

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,36 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) {
224224
cdcTest(t, testFn)
225225
}
226226

227-
func TestDatabaseLevelChangefeedWithFilter(t *testing.T) {
227+
func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
228+
defer leaktest.AfterTest(t)()
229+
defer log.Scope(t).Close(t)
230+
231+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
232+
expectSuccess := func(stmt string) {
233+
successfulFeed := feed(t, f, stmt)
234+
defer closeFeed(t, successfulFeed)
235+
_, err := successfulFeed.Next()
236+
require.NoError(t, err)
237+
}
238+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
239+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
240+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
241+
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
242+
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
243+
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
244+
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
245+
246+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
247+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2`)
248+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.bar.fizz, foo.foo2, foo`)
249+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`,
250+
`at or near "*": syntax error`)
251+
// TODO(#147421): Assert payload once the filter works
252+
}
253+
cdcTest(t, testFn, feedTestEnterpriseSinks)
254+
}
255+
256+
func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
228257
defer leaktest.AfterTest(t)()
229258
defer log.Scope(t).Close(t)
230259

pkg/jobs/job_info_storage.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (j *Job) ProgressStorage() ProgressStorage {
4747
func (i ProgressStorage) Get(
4848
ctx context.Context, txn isql.Txn,
4949
) (float64, hlc.Timestamp, time.Time, error) {
50+
if jobspb.JobID(i) == jobspb.InvalidJobID {
51+
return 0, hlc.Timestamp{}, time.Time{}, errors.AssertionFailedf("invalid job ID")
52+
}
53+
5054
ctx, sp := tracing.ChildSpan(ctx, "get-job-progress")
5155
defer sp.Finish()
5256

@@ -96,6 +100,10 @@ func (i ProgressStorage) Get(
96100
func (i ProgressStorage) Set(
97101
ctx context.Context, txn isql.Txn, fraction float64, resolved hlc.Timestamp,
98102
) error {
103+
if jobspb.JobID(i) == jobspb.InvalidJobID {
104+
return errors.AssertionFailedf("invalid job ID")
105+
}
106+
99107
ctx, sp := tracing.ChildSpan(ctx, "write-job-progress")
100108
defer sp.Finish()
101109

@@ -166,6 +174,10 @@ func (j *Job) StatusStorage() StatusStorage {
166174

167175
// Clear clears the status message row for the job, if it exists.
168176
func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
177+
if jobspb.JobID(i) == jobspb.InvalidJobID {
178+
return errors.AssertionFailedf("invalid job ID")
179+
}
180+
169181
_, err := txn.ExecEx(
170182
ctx, "clear-job-status-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
171183
`DELETE FROM system.job_status WHERE job_id = $1`, i,
@@ -176,6 +188,10 @@ func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
176188
// Sets writes the current status, replacing the current one if it exists.
177189
// Setting an empty status is the same as calling Clear().
178190
func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) error {
191+
if jobspb.JobID(i) == jobspb.InvalidJobID {
192+
return errors.AssertionFailedf("invalid job ID")
193+
}
194+
179195
ctx, sp := tracing.ChildSpan(ctx, "write-job-status")
180196
defer sp.Finish()
181197

@@ -197,6 +213,10 @@ func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) err
197213

198214
// Get gets the current status mesasge for a job, if any.
199215
func (i StatusStorage) Get(ctx context.Context, txn isql.Txn) (string, time.Time, error) {
216+
if jobspb.JobID(i) == jobspb.InvalidJobID {
217+
return "", time.Time{}, errors.AssertionFailedf("invalid job ID")
218+
}
219+
200220
ctx, sp := tracing.ChildSpan(ctx, "get-job-status")
201221
defer sp.Finish()
202222

@@ -247,6 +267,10 @@ func (j *Job) Messages() MessageStorage {
247267
// log for this job, and prunes retained messages of the same kind based on the
248268
// configured limit to keep the total number of retained messages bounded.
249269
func (i MessageStorage) Record(ctx context.Context, txn isql.Txn, kind, message string) error {
270+
if jobspb.JobID(i) == jobspb.InvalidJobID {
271+
return errors.AssertionFailedf("invalid job ID")
272+
}
273+
250274
ctx, sp := tracing.ChildSpan(ctx, "write-job-message")
251275
defer sp.Finish()
252276

@@ -279,6 +303,10 @@ type JobMessage struct {
279303
}
280304

281305
func (i MessageStorage) Fetch(ctx context.Context, txn isql.Txn) (_ []JobMessage, retErr error) {
306+
if jobspb.JobID(i) == jobspb.InvalidJobID {
307+
return nil, errors.AssertionFailedf("invalid job ID")
308+
}
309+
282310
ctx, sp := tracing.ChildSpan(ctx, "get-all-job-message")
283311
defer sp.Finish()
284312

@@ -374,6 +402,9 @@ func (i *InfoStorage) checkClaimSession(ctx context.Context) error {
374402
}
375403

376404
func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, bool, error) {
405+
if i.j.ID() == jobspb.InvalidJobID {
406+
return nil, false, errors.AssertionFailedf("invalid job ID")
407+
}
377408
if i.txn == nil {
378409
return nil, false, errors.New("cannot access the job info table without an associated txn")
379410
}
@@ -440,6 +471,9 @@ func (i InfoStorage) write(
440471
func (i InfoStorage) doWrite(
441472
ctx context.Context, fn func(ctx context.Context, job *Job, txn isql.Txn) error,
442473
) error {
474+
if i.j.ID() == jobspb.InvalidJobID {
475+
return errors.AssertionFailedf("invalid job ID")
476+
}
443477
if i.txn == nil {
444478
return errors.New("cannot write to the job info table without an associated txn")
445479
}
@@ -466,6 +500,9 @@ func (i InfoStorage) iterate(
466500
infoPrefix string,
467501
fn func(infoKey string, value []byte) error,
468502
) (retErr error) {
503+
if i.j.ID() == jobspb.InvalidJobID {
504+
return errors.AssertionFailedf("invalid job ID")
505+
}
469506
if i.txn == nil {
470507
return errors.New("cannot iterate over the job info table without an associated txn")
471508
}
@@ -589,6 +626,9 @@ func (i InfoStorage) DeleteRange(
589626

590627
// Count counts the info records in the range [start, end).
591628
func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string) (int, error) {
629+
if i.j.ID() == jobspb.InvalidJobID {
630+
return 0, errors.AssertionFailedf("invalid job ID")
631+
}
592632
if i.txn == nil {
593633
return 0, errors.New("cannot access the job info table without an associated txn")
594634
}

pkg/jobs/job_info_storage_test.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,3 +622,228 @@ func TestJobProgressAndStatusAccessors(t *testing.T) {
622622
sql.CheckQueryResults(t, fmt.Sprintf("SELECT status from system.job_status where job_id = %d", job4.ID()), [][]string{{"c"}})
623623
})
624624
}
625+
626+
func TestStorageRejectsInvalidJobID(t *testing.T) {
627+
defer leaktest.AfterTest(t)()
628+
defer log.Scope(t).Close(t)
629+
630+
ctx := context.Background()
631+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
632+
defer s.Stopper().Stop(ctx)
633+
634+
db := s.InternalDB().(isql.DB)
635+
636+
t.Run("ProgressStorage", func(t *testing.T) {
637+
progressStorage := jobs.ProgressStorage(jobspb.InvalidJobID)
638+
639+
t.Run("Get", func(t *testing.T) {
640+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
641+
_, _, _, err := progressStorage.Get(ctx, txn)
642+
require.Error(t, err)
643+
require.Contains(t, err.Error(), "invalid job ID")
644+
return nil
645+
}))
646+
})
647+
648+
t.Run("Set", func(t *testing.T) {
649+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
650+
err := progressStorage.Set(ctx, txn, 0.5, hlc.Timestamp{})
651+
require.Error(t, err)
652+
require.Contains(t, err.Error(), "invalid job ID")
653+
return nil
654+
}))
655+
})
656+
})
657+
658+
t.Run("StatusStorage", func(t *testing.T) {
659+
statusStorage := jobs.StatusStorage(jobspb.InvalidJobID)
660+
661+
t.Run("Get", func(t *testing.T) {
662+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
663+
_, _, err := statusStorage.Get(ctx, txn)
664+
require.Error(t, err)
665+
require.Contains(t, err.Error(), "invalid job ID")
666+
return nil
667+
}))
668+
})
669+
670+
t.Run("Set", func(t *testing.T) {
671+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
672+
err := statusStorage.Set(ctx, txn, "test status")
673+
require.Error(t, err)
674+
require.Contains(t, err.Error(), "invalid job ID")
675+
return nil
676+
}))
677+
})
678+
679+
t.Run("Clear", func(t *testing.T) {
680+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
681+
err := statusStorage.Clear(ctx, txn)
682+
require.Error(t, err)
683+
require.Contains(t, err.Error(), "invalid job ID")
684+
return nil
685+
}))
686+
})
687+
})
688+
689+
t.Run("MessageStorage", func(t *testing.T) {
690+
messageStorage := jobs.MessageStorage(jobspb.InvalidJobID)
691+
692+
t.Run("Record", func(t *testing.T) {
693+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
694+
err := messageStorage.Record(ctx, txn, "test", "test message")
695+
require.Error(t, err)
696+
require.Contains(t, err.Error(), "invalid job ID")
697+
return nil
698+
}))
699+
})
700+
701+
t.Run("Fetch", func(t *testing.T) {
702+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
703+
_, err := messageStorage.Fetch(ctx, txn)
704+
require.Error(t, err)
705+
require.Contains(t, err.Error(), "invalid job ID")
706+
return nil
707+
}))
708+
})
709+
})
710+
711+
t.Run("InfoStorage", func(t *testing.T) {
712+
t.Run("Get", func(t *testing.T) {
713+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
714+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
715+
_, _, err := infoStorage.Get(ctx, "test-op", "test-key")
716+
require.Error(t, err)
717+
require.Contains(t, err.Error(), "invalid job ID")
718+
return nil
719+
}))
720+
})
721+
722+
t.Run("Write", func(t *testing.T) {
723+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
724+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
725+
err := infoStorage.Write(ctx, "test-key", []byte("test-value"))
726+
require.Error(t, err)
727+
require.Contains(t, err.Error(), "invalid job ID")
728+
return nil
729+
}))
730+
})
731+
732+
t.Run("WriteFirstKey", func(t *testing.T) {
733+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
734+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
735+
err := infoStorage.WriteFirstKey(ctx, "test-key", []byte("test-value"))
736+
require.Error(t, err)
737+
require.Contains(t, err.Error(), "invalid job ID")
738+
return nil
739+
}))
740+
})
741+
742+
t.Run("Delete", func(t *testing.T) {
743+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
744+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
745+
err := infoStorage.Delete(ctx, "test-key")
746+
require.Error(t, err)
747+
require.Contains(t, err.Error(), "invalid job ID")
748+
return nil
749+
}))
750+
})
751+
752+
t.Run("DeleteRange", func(t *testing.T) {
753+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
754+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
755+
err := infoStorage.DeleteRange(ctx, "start", "end", 0)
756+
require.Error(t, err)
757+
require.Contains(t, err.Error(), "invalid job ID")
758+
return nil
759+
}))
760+
})
761+
762+
t.Run("Count", func(t *testing.T) {
763+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
764+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
765+
_, err := infoStorage.Count(ctx, "start", "end")
766+
require.Error(t, err)
767+
require.Contains(t, err.Error(), "invalid job ID")
768+
return nil
769+
}))
770+
})
771+
772+
t.Run("Iterate", func(t *testing.T) {
773+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
774+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
775+
err := infoStorage.Iterate(ctx, "prefix", func(key string, value []byte) error {
776+
return nil
777+
})
778+
require.Error(t, err)
779+
require.Contains(t, err.Error(), "invalid job ID")
780+
return nil
781+
}))
782+
})
783+
784+
t.Run("GetLast", func(t *testing.T) {
785+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
786+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
787+
err := infoStorage.GetLast(ctx, "prefix", func(key string, value []byte) error {
788+
return nil
789+
})
790+
require.Error(t, err)
791+
require.Contains(t, err.Error(), "invalid job ID")
792+
return nil
793+
}))
794+
})
795+
796+
t.Run("GetLegacyPayload", func(t *testing.T) {
797+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
798+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
799+
_, _, err := infoStorage.GetLegacyPayload(ctx, "test-op")
800+
require.Error(t, err)
801+
require.Contains(t, err.Error(), "invalid job ID")
802+
return nil
803+
}))
804+
})
805+
806+
t.Run("WriteLegacyPayload", func(t *testing.T) {
807+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
808+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
809+
err := infoStorage.WriteLegacyPayload(ctx, []byte("test-payload"))
810+
require.Error(t, err)
811+
require.Contains(t, err.Error(), "invalid job ID")
812+
return nil
813+
}))
814+
})
815+
816+
t.Run("GetLegacyProgress", func(t *testing.T) {
817+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
818+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
819+
_, _, err := infoStorage.GetLegacyProgress(ctx, "test-op")
820+
require.Error(t, err)
821+
require.Contains(t, err.Error(), "invalid job ID")
822+
return nil
823+
}))
824+
})
825+
826+
t.Run("WriteLegacyProgress", func(t *testing.T) {
827+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
828+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
829+
err := infoStorage.WriteLegacyProgress(ctx, []byte("test-progress"))
830+
require.Error(t, err)
831+
require.Contains(t, err.Error(), "invalid job ID")
832+
return nil
833+
}))
834+
})
835+
})
836+
837+
// Verify no rows were created in any of the job tables.
838+
sqlDB := sqlutils.MakeSQLRunner(s.SQLConn(t))
839+
sqlDB.CheckQueryResults(t,
840+
"SELECT count(*) FROM system.job_progress WHERE job_id = 0", [][]string{{"0"}})
841+
sqlDB.CheckQueryResults(t,
842+
"SELECT count(*) FROM system.job_progress_history WHERE job_id = 0", [][]string{{"0"}})
843+
sqlDB.CheckQueryResults(t,
844+
"SELECT count(*) FROM system.job_status WHERE job_id = 0", [][]string{{"0"}})
845+
sqlDB.CheckQueryResults(t,
846+
"SELECT count(*) FROM system.job_message WHERE job_id = 0", [][]string{{"0"}})
847+
sqlDB.CheckQueryResults(t,
848+
"SELECT count(*) FROM system.job_info WHERE job_id = 0", [][]string{{"0"}})
849+
}

0 commit comments

Comments
 (0)