Skip to content

Commit b3ee921

Browse files
committed
builtins: add builtin to show jobs pending creation in session
Previously these were only accessible by materializing the whole crdb_internal.jobs vtable. If/when we remove that vtable or replace it with a view, we'll still need to read these pending jobs from memory, but we don't need a whole vtable for that and can just use a builtin instead. Release note: none. Epic: none.
1 parent fd18c1b commit b3ee921

File tree

8 files changed

+107
-1
lines changed

8 files changed

+107
-1
lines changed

pkg/jobs/jobspb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
deps = [
1616
"//pkg/base",
1717
"//pkg/roachpb",
18+
"//pkg/security/username",
1819
"//pkg/sql/catalog/catpb",
1920
"//pkg/sql/catalog/descpb",
2021
"//pkg/sql/protoreflect",

pkg/jobs/jobspb/jobs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"slices"
1212

1313
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/security/username"
1415
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
1516
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1617
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
@@ -23,6 +24,14 @@ type JobID = catpb.JobID
2324
// InvalidJobID is the zero value for JobID corresponding to no job.
2425
const InvalidJobID = catpb.InvalidJobID
2526

27+
// PendingJob represents a job that is pending creation (e.g. in a session).
28+
type PendingJob struct {
29+
JobID JobID
30+
Type Type
31+
Description string
32+
Username username.SQLUsername
33+
}
34+
2635
// ToText implements the ProtobinExecutionDetailFile interface.
2736
func (t *TraceData) ToText() []byte {
2837
rec := tracingpb.Recording(t.CollectedSpans)

pkg/sql/faketreeeval/evalctx.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,13 @@ func (ep *DummySessionAccessor) HasViewActivityOrViewActivityRedactedRole(
647647
return false, false, errors.WithStack(errEvalSessionVar)
648648
}
649649

650+
func (ep *DummySessionAccessor) ForEachSessionPendingJob(
651+
_ func(job jobspb.PendingJob) error,
652+
) error {
653+
// This is a no-op in the dummy implementation.
654+
return nil
655+
}
656+
650657
// DummyClientNoticeSender implements the eval.ClientNoticeSender interface.
651658
type DummyClientNoticeSender struct{}
652659

pkg/sql/jobs_collection.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,21 @@ func (j *txnJobsCollection) forEachToCreate(fn func(jobRecord *jobs.Record) erro
8282
}
8383
return nil
8484
}
85+
86+
func (p *planner) ForEachSessionPendingJob(fn func(job jobspb.PendingJob) error) error {
87+
if p.extendedEvalCtx.jobs == nil {
88+
return nil
89+
}
90+
return p.extendedEvalCtx.jobs.forEachToCreate(func(r *jobs.Record) error {
91+
payloadType, err := jobspb.DetailsType(jobspb.WrapPayloadDetails(r.Details))
92+
if err != nil {
93+
return err
94+
}
95+
return fn(jobspb.PendingJob{
96+
JobID: r.JobID,
97+
Description: r.Description,
98+
Username: r.Username,
99+
Type: payloadType,
100+
})
101+
})
102+
}

pkg/sql/opt/exec/execbuilder/testdata/explain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ distribution: local
121121
vectorized: true
122122
·
123123
• sort
124-
│ order: -column26,-started
124+
│ order: -column20,-created
125125
126126
└── • render
127127

pkg/sql/sem/builtins/fixed_oids.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2665,6 +2665,7 @@ var builtinOidsArray = []string{
26652665
2702: `crdb_internal.force_retry(val: int) -> int`,
26662666
2703: `crdb_internal.show_create_all_routines(database_name: string) -> string`,
26672667
2704: `crdb_internal.show_create_all_triggers(database_name: string) -> string`,
2668+
2705: `crdb_internal.session_pending_jobs() -> tuple{int AS job_id, string AS job_type, string AS description, string AS user_name}`,
26682669
}
26692670

26702671
var builtinOidsBySignature map[string]oid.Oid

pkg/sql/sem/builtins/generator_builtins.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/cockroach/pkg/build"
19+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1920
"github.com/cockroachdb/cockroach/pkg/keys"
2021
"github.com/cockroachdb/cockroach/pkg/kv"
2122
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -633,6 +634,19 @@ The output can be used to recreate a database.'
633634
volatility.Volatile,
634635
),
635636
),
637+
"crdb_internal.session_pending_jobs": makeBuiltin(
638+
tree.FunctionProperties{
639+
Category: builtinconstants.CategorySystemInfo,
640+
DistsqlBlocklist: true, // applicable only on the gateway
641+
},
642+
makeGeneratorOverload(
643+
tree.ParamTypes{},
644+
sessionPendingJobsType,
645+
makeSessionPendingJobsGenerator,
646+
`Returns rows of information about all pending jobs created in the session txn.`,
647+
volatility.Volatile,
648+
),
649+
),
636650
"crdb_internal.decode_plan_gist": makeBuiltin(
637651
tree.FunctionProperties{
638652
Category: builtinconstants.CategorySystemInfo,
@@ -2875,6 +2889,10 @@ var showCreateAllTriggersGeneratorType = types.String
28752889
var showCreateAllTypesGeneratorType = types.String
28762890
var showCreateAllTablesGeneratorType = types.String
28772891
var showCreateAllRoutinesGeneratorType = types.String
2892+
var sessionPendingJobsType = types.MakeLabeledTuple(
2893+
[]*types.T{types.Int, types.String, types.String, types.String},
2894+
[]string{"job_id", "job_type", "description", "user_name"},
2895+
)
28782896

28792897
// Phase is used to determine if CREATE statements or ALTER statements
28802898
// are being generated for showCreateAllTables.
@@ -3372,6 +3390,53 @@ func makeShowCreateAllRoutinesGenerator(
33723390
}, nil
33733391
}
33743392

3393+
func makeSessionPendingJobsGenerator(
3394+
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
3395+
) (eval.ValueGenerator, error) {
3396+
records := []jobspb.PendingJob{{}} // Next() always pops first, so pad a zero.
3397+
if err := evalCtx.SessionAccessor.ForEachSessionPendingJob(func(r jobspb.PendingJob) error {
3398+
records = append(records, r)
3399+
return nil
3400+
}); err != nil {
3401+
return nil, err
3402+
}
3403+
return &sessionPendingJobsGenerator{
3404+
jobs: records,
3405+
}, nil
3406+
}
3407+
3408+
type sessionPendingJobsGenerator struct {
3409+
jobs []jobspb.PendingJob
3410+
}
3411+
3412+
// ResolvedType implements the eval.ValueGenerator interface.
3413+
func (s *sessionPendingJobsGenerator) ResolvedType() *types.T {
3414+
return sessionPendingJobsType
3415+
}
3416+
3417+
// Start implements the eval.ValueGenerator interface.
3418+
func (s *sessionPendingJobsGenerator) Start(ctx context.Context, txn *kv.Txn) error {
3419+
return nil
3420+
}
3421+
3422+
func (s *sessionPendingJobsGenerator) Next(ctx context.Context) (bool, error) {
3423+
s.jobs = s.jobs[1:]
3424+
return len(s.jobs) > 0, nil
3425+
}
3426+
3427+
// Values implements the eval.ValueGenerator interface.
3428+
func (s *sessionPendingJobsGenerator) Values() (tree.Datums, error) {
3429+
return tree.Datums{tree.NewDInt(tree.DInt(s.jobs[0].JobID)),
3430+
tree.NewDString(s.jobs[0].Type.String()),
3431+
tree.NewDString(s.jobs[0].Description),
3432+
tree.NewDString(s.jobs[0].Username.Normalized()),
3433+
}, nil
3434+
}
3435+
3436+
// Close implements the eval.ValueGenerator interface.
3437+
func (s *sessionPendingJobsGenerator) Close(ctx context.Context) {
3438+
}
3439+
33753440
// identGenerator supports the execution of
33763441
// crdb_internal.gen_rand_ident().
33773442
type identGenerator struct {

pkg/sql/sem/eval/deps.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,11 @@ type SessionAccessor interface {
530530
// HasViewActivityOrViewActivityRedactedRole returns true iff the current session user has the
531531
// VIEWACTIVITY or VIEWACTIVITYREDACTED permission.
532532
HasViewActivityOrViewActivityRedactedRole(ctx context.Context) (bool, bool, error)
533+
534+
// ForEachSessionPendingJob calls the provided function for each pending job
535+
// created in the session (hidden behind the generic interface{} to avoid
536+
// circular dependencies, but the caller can cast it to jobs.Record).
537+
ForEachSessionPendingJob(fn func(record jobspb.PendingJob) error) error
533538
}
534539

535540
// PreparedStatementState is a limited interface that exposes metadata about

0 commit comments

Comments
 (0)