Skip to content

Commit cddd383

Browse files
craig[bot]msbutler
andcommitted
Merge #157070
157070: jobs: log message on job pause and cancel state change r=jeffswenson a=msbutler The message logger is supposed to log all job state changes. Epic: none Release note: none Co-authored-by: Michael Butler <[email protected]>
2 parents c24bbca + c585450 commit cddd383

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

pkg/jobs/adopt.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
501501
job := &Job{id: id, registry: r}
502502
stateString := *row[1].(*tree.DString)
503503
jobTypeString := *row[2].(*tree.DString)
504+
505+
if err := job.Messages().Record(ctx, txn, "state", string(stateString)); err != nil {
506+
return err
507+
}
508+
504509
switch State(stateString) {
505510
case StatePaused:
506511
if !r.cancelRegisteredJobContext(id) {

pkg/jobs/job_info_storage_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/base"
1717
"github.com/cockroachdb/cockroach/pkg/jobs"
1818
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
19+
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
1920
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
2021
"github.com/cockroachdb/cockroach/pkg/security/username"
22+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2123
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2224
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2325
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2426
"github.com/cockroachdb/cockroach/pkg/testutils"
27+
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
2528
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2629
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2730
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
@@ -847,3 +850,74 @@ func TestStorageRejectsInvalidJobID(t *testing.T) {
847850
sqlDB.CheckQueryResults(t,
848851
"SELECT count(*) FROM system.job_info WHERE job_id = 0", [][]string{{"0"}})
849852
}
853+
854+
func TestJobPauseStateTransitionsRecorded(t *testing.T) {
855+
defer leaktest.AfterTest(t)()
856+
defer log.Scope(t).Close(t)
857+
858+
ctx := context.Background()
859+
860+
args := base.TestServerArgs{
861+
Knobs: base.TestingKnobs{
862+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
863+
},
864+
}
865+
s, sqlDB, _ := serverutils.StartServer(t, args)
866+
defer s.Stopper().Stop(ctx)
867+
868+
sql := sqlutils.MakeSQLRunner(sqlDB)
869+
idb := s.InternalDB().(isql.DB)
870+
r := s.JobRegistry().(*jobs.Registry)
871+
872+
blockCh := make(chan struct{})
873+
defer close(blockCh)
874+
875+
// Register a fake resumer so the job doesn't complete during the test.
876+
cleanup := jobs.TestingRegisterConstructor(jobspb.TypeBackup, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
877+
return jobstest.FakeResumer{
878+
OnResume: func(ctx context.Context) error {
879+
select {
880+
case <-ctx.Done():
881+
return ctx.Err()
882+
case <-blockCh:
883+
return nil
884+
}
885+
},
886+
}
887+
}, jobs.UsesTenantCostControl)
888+
defer cleanup()
889+
890+
record := jobs.Record{
891+
Details: jobspb.BackupDetails{},
892+
Progress: jobspb.BackupProgress{},
893+
Username: username.TestUserName(),
894+
}
895+
job, err := jobs.TestingCreateAndStartJob(ctx, r, idb, record)
896+
require.NoError(t, err)
897+
jobutils.WaitForJobToRun(t, sql, job.ID())
898+
899+
sql.Exec(t, "PAUSE JOB $1", job.ID())
900+
jobutils.WaitForJobToPause(t, sql, job.ID())
901+
902+
sql.Exec(t, "RESUME JOB $1", job.ID())
903+
jobutils.WaitForJobToRun(t, sql, job.ID())
904+
905+
sql.Exec(t, "CANCEL JOB $1", job.ID())
906+
jobutils.WaitForJobToCancel(t, sql, job.ID())
907+
908+
var messages []jobs.JobMessage
909+
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
910+
var err error
911+
messages, err = job.Job.Messages().Fetch(ctx, txn)
912+
return err
913+
}))
914+
915+
var stateMessages []string
916+
for _, msg := range messages {
917+
if msg.Kind == "state" {
918+
stateMessages = append(stateMessages, msg.Message)
919+
}
920+
}
921+
922+
require.Equal(t, []string{"canceled", "reverting", "cancel-requested", "running", "paused", "pause-requested"}, stateMessages)
923+
}

0 commit comments

Comments
 (0)