Skip to content

Commit 29c1a7c

Browse files
craig[bot]erikgrinakerstevendanna
committed
107939: roachtest: add changefeed workload benchmarks r=erikgrinaker a=erikgrinaker This patch adds a set of benchmarks measuring the workload impact of a changefeed. The workload is single-row KV read-only and write-only, recording the throughput and latencies of the workload both without and with a changefeed running, graphed via roachperf. The watermark lag is also logged, but not recorded or asserted. ``` cdc/workload/kv0/nodes=5/cpu=16/ranges=100/control [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/control [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv0/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/control [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100/server=processor/protocol=rangefeed/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/control [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=mux/format=json/sink=null [cdc] cdc/workload/kv100/nodes=5/cpu=16/ranges=100000/server=processor/protocol=rangefeed/format=json/sink=null [cdc] ``` Resolves cockroachdb#107441. Release note: None 108080: upgrades: avoid crdb_internal.system_jobs in upgrade manager r=adityamaru a=stevendanna The crdb_internal.system_jobs is a virtual table that joins information from the jobs table and the jobs_info table. When given a job status predicate it does this by running a query such as: WITH latestpayload AS ( SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC ), latestprogress AS ( SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress' ORDER BY written DESC ) SELECT distinct(id), status, created, payload.value AS payload, progress.value AS progress, created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run,job_type FROM system.jobs AS j INNER JOIN latestpayload AS payload ON j.id = payload.job_id LEFT JOIN latestprogress AS progress ON j.id = progress.job_id WHERE j.status = 'cancel-requested'; This uses 2 full scans of the job_info table: ``` • distinct │ distinct on: id, value, value │ └── • merge join │ equality: (job_id) = (id) │ ├── • render │ │ │ └── • filter │ │ estimated row count: 2,787 │ │ filter: info_key = 'legacy_payload' │ │ │ └── • scan │ estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago) │ table: job_info@primary │ spans: FULL SCAN │ └── • merge join (right outer) │ equality: (job_id) = (id) │ right cols are key │ ├── • render │ │ │ └── • filter │ │ estimated row count: 2,787 │ │ filter: info_key = 'legacy_progress' │ │ │ └── • scan │ estimated row count: 5,597 (100% of the table; stats collected 27 minutes ago; using stats forecast for 17 minutes ago) │ table: job_info@primary │ spans: FULL SCAN │ └── • index join │ table: jobs@primary │ └── • sort │ order: +id │ └── • scan missing stats table: jobs@jobs_status_created_idx spans: [/'cancel-requested' - /'cancel-requested'] ``` Previously, the upgrade manager was using this virtual table as part of a larger query: SELECT id, status FROM ( SELECT id, status, crdb_internal.pb_to_json( 'cockroach.sql.jobs.jobspb.Payload', payload, false ) AS pl FROM crdb_internal.system_jobs WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') ) WHERE pl->'migration'->'clusterVersion' = $1::JSONB; I believe the use of the IN operator causes the virtual index's populate function to be called for each value. Perhaps the optimizer accounts for this in some way to avoid this resulting in 2 * 6 full scans of the job table, but it is hard to confirm with the explain output. In at least one recent escalation, we observed this query taking a substantial amount of time as it continually conflicted with other job system queries. Here, we avoid using the virtual table. This allows us to avoid the full scasn of the info table since we don't need the progress (only the payload). It also allows us to use the full `IN` predicate directly, avoiding any uncertainty. ``` • root │ ├── • hash join │ │ equality: (job_id) = (id) │ │ right cols are key │ │ │ ├── • render │ │ │ │ │ └── • lookup join │ │ │ table: job_info@primary │ │ │ equality: (id, lookup_join_const_col_@16) = (job_id,info_key) │ │ │ │ │ └── • render │ │ │ │ │ └── • scan buffer │ │ label: buffer 1 (running_migration_jobs) │ │ │ └── • scan buffer │ label: buffer 1 (running_migration_jobs) │ └── • subquery │ id: `@S1` │ original sql: SELECT id, status FROM system.jobs WHERE (status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused')) AND (job_type = 'MIGRATION') │ exec mode: all rows │ └── • buffer │ label: buffer 1 (running_migration_jobs) │ └── • filter │ filter: status IN ('cancel-requested', 'pause-requested', 'paused', 'pending', 'reverting', 'running') │ └── • index join │ table: jobs@primary │ └── • scan missing stats table: jobs@jobs_job_type_idx spans: [/'MIGRATION' - /'MIGRATION'] ``` In a local example, this is substantially faster ``` root@localhost:26257/defaultdb> SELECT id, status -> FROM ( -> SELECT id, -> status, -> crdb_internal.pb_to_json( -> 'cockroach.sql.jobs.jobspb.Payload', -> payload, -> false -- emit_defaults -> ) AS pl -> FROM crdb_internal.system_jobs -> WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') -> ) -> WHERE pl->'migration'->'clusterVersion' = '{"activeVersion": {"internal": 84, "majorVal": 22, "minorVal": 2}}'::JSONB; id | status -----+--------- (0 rows) Time: 384ms total (execution 384ms / network 0ms) root@localhost:26257/defaultdb> WITH -> running_migration_jobs AS ( -> SELECT id, status -> FROM system.jobs -> WHERE status IN ('running', 'pending', 'cancel-requested', 'pause-requested', 'reverting', 'paused') -> AND job_type = 'MIGRATION' -> ), -> payloads AS ( -> SELECT job_id, value -> FROM system.job_info AS payload -> WHERE info_key = 'legacy_payload' -> AND job_id IN (SELECT id FROM running_migration_jobs) -> ORDER BY written DESC -> ) -> SELECT id, status FROM ( -> SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl -> FROM running_migration_jobs AS j -> INNER JOIN payloads ON j.id = payloads.job_id -> ); id | status -----+--------- (0 rows) Time: 3ms total (execution 2ms / network 0ms) ``` Note that the new query will return 2 rows if we happen to have 2 legacy_payload keys for a given job. This will result in an assertion failure. But I think this is reasonable since we take care to only ever have 1 legacy payload row. We should do more work to understand contention within the job system, but perhaps speeding up this query will help a bit. Epic: None Release note: None Co-authored-by: Erik Grinaker <[email protected]> Co-authored-by: Steven Danna <[email protected]>
3 parents b87852a + 4398dc4 + 612d49f commit 29c1a7c

File tree

3 files changed

+307
-15
lines changed

3 files changed

+307
-15
lines changed

pkg/cmd/roachtest/tests/cdc_bench.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"encoding/json"
1818
"fmt"
1919
"path/filepath"
20+
"strconv"
21+
"sync/atomic"
2022
"time"
2123

2224
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
@@ -26,6 +28,7 @@ import (
2628
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2729
"github.com/cockroachdb/cockroach/pkg/jobs"
2830
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
31+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
2932
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3033
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
3134
"github.com/cockroachdb/errors"
@@ -34,6 +37,7 @@ import (
3437
)
3538

3639
type cdcBenchScanType string
40+
type cdcBenchServer string
3741
type cdcBenchProtocol string
3842

3943
const (
@@ -56,6 +60,10 @@ const (
5660
// practice it can.
5761
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"
5862

63+
cdcBenchNoServer cdcBenchServer = ""
64+
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
65+
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler
66+
5967
cdcBenchNoProtocol cdcBenchProtocol = ""
6068
cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol
6169
cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol
@@ -64,6 +72,7 @@ const (
6472
var (
6573
cdcBenchScanTypes = []cdcBenchScanType{
6674
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
75+
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer} // TODO(erikgrinaker): scheduler
6776
cdcBenchProtocols = []cdcBenchProtocol{cdcBenchRangefeedProtocol, cdcBenchMuxProtocol}
6877
)
6978

@@ -99,6 +108,53 @@ func registerCDCBench(r registry.Registry) {
99108
}
100109
}
101110
}
111+
112+
// Workload impact benchmarks.
113+
for _, readPercent := range []int{0, 100} {
114+
for _, ranges := range []int64{100, 100000} {
115+
readPercent, ranges := readPercent, ranges // pin loop variables
116+
const (
117+
nodes = 5 // excluding coordinator and workload nodes
118+
cpus = 16
119+
format = "json"
120+
)
121+
122+
// Control run that only runs the workload, with no changefeed.
123+
r.Add(registry.TestSpec{
124+
Name: fmt.Sprintf(
125+
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/control",
126+
readPercent, nodes, cpus, ranges),
127+
Owner: registry.OwnerCDC,
128+
Benchmark: true,
129+
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
130+
RequiresLicense: true,
131+
Timeout: time.Hour,
132+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
133+
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", "")
134+
},
135+
})
136+
137+
// Workloads with a concurrent changefeed running.
138+
for _, server := range cdcBenchServers {
139+
for _, protocol := range cdcBenchProtocols {
140+
server, protocol := server, protocol // pin loop variables
141+
r.Add(registry.TestSpec{
142+
Name: fmt.Sprintf(
143+
"cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/server=%s/protocol=%s/format=%s/sink=null",
144+
readPercent, nodes, cpus, ranges, server, protocol, format),
145+
Owner: registry.OwnerCDC,
146+
Benchmark: true,
147+
Cluster: r.MakeClusterSpec(nodes+2, spec.CPU(cpus)),
148+
RequiresLicense: true,
149+
Timeout: time.Hour,
150+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
151+
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, protocol, format)
152+
},
153+
})
154+
}
155+
}
156+
}
157+
}
102158
}
103159

104160
func formatSI(num int64) string {
@@ -263,6 +319,209 @@ func runCDCBenchScan(
263319
m.Wait()
264320
}
265321

322+
// runCDCBenchWorkload runs a KV workload on top of a changefeed, measuring the
323+
// workload throughput and latency. Rangefeeds are configured to backpressure
324+
// writers, which yields reliable results for the full write+emission cost.
325+
// The workload results (throughput and latency) can be compared to separate
326+
// control runs that only run the workload without changefeeds and rangefeeds.
327+
//
328+
// It sets up a cluster with N-2 data nodes, and a separate changefeed
329+
// coordinator node and workload runner.
330+
func runCDCBenchWorkload(
331+
ctx context.Context,
332+
t test.Test,
333+
c cluster.Cluster,
334+
numRanges int64,
335+
readPercent int,
336+
server cdcBenchServer,
337+
protocol cdcBenchProtocol,
338+
format string,
339+
) {
340+
const sink = "null://"
341+
var (
342+
numNodes = c.Spec().NodeCount
343+
nData = c.Range(1, numNodes-2)
344+
nCoord = c.Node(numNodes - 1)
345+
nWorkload = c.Node(numNodes)
346+
347+
workloadSeed = randutil.NewPseudoSeed()
348+
concurrency = len(nData) * 64
349+
duration = 20 * time.Minute
350+
insertCount = int64(0)
351+
cdcEnabled = true
352+
)
353+
if readPercent == 100 {
354+
insertCount = 1_000_000 // ingest some data to read
355+
}
356+
// Either of these will disable changefeeds. Make sure they're all disabled.
357+
if server == "" || protocol == "" || format == "" {
358+
require.Empty(t, server)
359+
require.Empty(t, protocol)
360+
require.Empty(t, format)
361+
cdcEnabled = false
362+
}
363+
364+
// Start data nodes first to place data on them. We'll start the changefeed
365+
// coordinator later, since we don't want any data on it.
366+
opts, settings := makeCDCBenchOptions()
367+
settings.ClusterSettings["kv.rangefeed.enabled"] = strconv.FormatBool(cdcEnabled)
368+
369+
switch protocol {
370+
case cdcBenchMuxProtocol:
371+
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "true"
372+
case cdcBenchRangefeedProtocol:
373+
settings.ClusterSettings["changefeed.mux_rangefeed.enabled"] = "false"
374+
case cdcBenchNoProtocol:
375+
default:
376+
t.Fatalf("unknown protocol %q", protocol)
377+
}
378+
379+
c.Put(ctx, t.Cockroach(), "./cockroach")
380+
c.Start(ctx, t.L(), opts, settings, nData)
381+
m := c.NewMonitor(ctx, nData.Merge(nCoord))
382+
383+
conn := c.Conn(ctx, t.L(), nData[0])
384+
defer conn.Close()
385+
386+
// Prohibit ranges on the changefeed coordinator.
387+
t.L().Printf("configuring zones")
388+
for _, target := range getAllZoneTargets(ctx, t, conn) {
389+
_, err := conn.ExecContext(ctx, fmt.Sprintf(
390+
`ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'`, target, nCoord[0]))
391+
require.NoError(t, err)
392+
}
393+
394+
// Wait for system ranges to upreplicate.
395+
require.NoError(t, WaitFor3XReplication(ctx, t, conn))
396+
397+
// Create and split the workload table.
398+
//
399+
// NB: don't scatter -- the ranges end up fairly well-distributed anyway, and
400+
// the scatter can often fail with 100k ranges.
401+
t.L().Printf("creating table with %s ranges", humanize.Comma(numRanges))
402+
c.Run(ctx, nWorkload, fmt.Sprintf(
403+
`./cockroach workload init kv --splits %d {pgurl:%d}`, numRanges, nData[0]))
404+
require.NoError(t, WaitFor3XReplication(ctx, t, conn))
405+
406+
// For read-only workloads, ingest some data. init --insert-count does not use
407+
// the standard key generator that the read workload uses, so we have to write
408+
// them with a separate write workload first, see:
409+
// https://github.com/cockroachdb/cockroach/issues/107874
410+
if insertCount > 0 {
411+
const batchSize = 1000
412+
batches := (insertCount-1)/batchSize + 1 // ceiling division
413+
t.L().Printf("ingesting %s rows", humanize.Comma(insertCount))
414+
c.Run(ctx, nWorkload, fmt.Sprintf(
415+
`./cockroach workload run kv --seed %d --read-percent 0 --batch %d --max-ops %d {pgurl:%d}`,
416+
workloadSeed, batchSize, batches, nData[0]))
417+
}
418+
419+
// Now that the ranges are placed, start the changefeed coordinator.
420+
t.L().Printf("starting coordinator node")
421+
c.Start(ctx, t.L(), opts, settings, nCoord)
422+
423+
conn = c.Conn(ctx, t.L(), nCoord[0])
424+
defer conn.Close()
425+
426+
// Start the changefeed if enabled. We disable the initial scan, since we
427+
// don't care about the historical data.
428+
var jobID int
429+
var done atomic.Value // time.Time
430+
if cdcEnabled {
431+
t.L().Printf("starting changefeed")
432+
require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(
433+
`CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'`,
434+
sink, format)).
435+
Scan(&jobID))
436+
437+
// Monitor the changefeed for failures. When the workload finishes, it will
438+
// store the completion timestamp in done, and we'll wait for the
439+
// changefeed's watermark to reach it.
440+
//
441+
// The watermark and lag isn't recorded by the benchmark, but we make sure
442+
// all data is eventually emitted. It is also helpful for inspection, and we
443+
// may want to track or assert on it later. Initially, this asserted that
444+
// the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
445+
// ranges it was found to sometimes lag by over 8 minutes.
446+
m.Go(func(ctx context.Context) error {
447+
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
448+
switch jobs.Status(info.status) {
449+
case jobs.StatusPending, jobs.StatusRunning:
450+
doneValue := done.Load()
451+
return doneValue != nil && info.highwaterTime.After(doneValue.(time.Time)), nil
452+
default:
453+
return false, errors.Errorf("unexpected changefeed status %s", info.status)
454+
}
455+
})
456+
if err != nil {
457+
return err
458+
}
459+
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))
460+
return nil
461+
})
462+
463+
// Wait for a stable changefeed before starting the workload, by waiting for
464+
// the watermark to reach the current time.
465+
now := timeutil.Now()
466+
t.L().Printf("waiting for changefeed watermark to reach current time (%s)",
467+
now.Format(time.RFC3339))
468+
info, err := waitForChangefeed(ctx, conn, jobID, func(info changefeedInfo) (bool, error) {
469+
switch jobs.Status(info.status) {
470+
case jobs.StatusPending, jobs.StatusRunning:
471+
return info.highwaterTime.After(now), nil
472+
default:
473+
return false, errors.Errorf("unexpected changefeed status %s", info.status)
474+
}
475+
})
476+
require.NoError(t, err)
477+
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))
478+
479+
} else {
480+
t.L().Printf("control run, not starting changefeed")
481+
}
482+
483+
// Run the workload and record stats. Make sure to use the same seed, so we
484+
// read any rows we wrote above.
485+
m.Go(func(ctx context.Context) error {
486+
// If there's more than 10,000 replicas per node they may struggle to
487+
// maintain RPC connections or liveness, which occasionally fails client
488+
// write requests with ambiguous errors. We tolerate errors in this case
489+
// until we optimize rangefeeds.
490+
//
491+
// TODO(erikgrinaker): remove this when benchmarks are stable.
492+
var extra string
493+
if readPercent < 100 && (numRanges/int64(len(nData))) >= 10000 {
494+
extra += ` --tolerate-errors`
495+
}
496+
t.L().Printf("running workload")
497+
err := c.RunE(ctx, nWorkload, fmt.Sprintf(
498+
`./cockroach workload run kv --seed %d --histograms=%s/stats.json `+
499+
`--concurrency %d --duration %s --write-seq R%d --read-percent %d %s {pgurl:%d-%d}`,
500+
workloadSeed, t.PerfArtifactsDir(), concurrency, duration, insertCount, readPercent, extra,
501+
nData[0], nData[len(nData)-1]))
502+
if err != nil {
503+
return err
504+
}
505+
t.L().Printf("workload completed")
506+
507+
// When the workload completes, signal the completion time to the changefeed
508+
// monitor via done, which will wait for it to fully catch up.
509+
if cdcEnabled {
510+
now := timeutil.Now()
511+
done.Store(now)
512+
info, err := getChangefeedInfo(conn, jobID)
513+
if err != nil {
514+
return err
515+
}
516+
t.L().Printf("waiting for changefeed watermark to reach %s (lagging by %s)",
517+
now.Format(time.RFC3339), now.Sub(info.highwaterTime).Truncate(time.Second))
518+
}
519+
return nil
520+
})
521+
522+
m.Wait()
523+
}
524+
266525
// getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE
267526
// system", etc).
268527
func getAllZoneTargets(ctx context.Context, t test.Test, conn *gosql.DB) []string {

pkg/upgrade/upgrademanager/manager.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -778,26 +778,56 @@ func (m *Manager) getOrCreateMigrationJob(
778778
return alreadyCompleted, alreadyExisting, jobID, nil
779779
}
780780

781+
const (
782+
preJobInfoTableQuery = `
783+
SELECT id, status
784+
FROM (
785+
SELECT id, status,
786+
crdb_internal.pb_to_json(
787+
'cockroach.sql.jobs.jobspb.Payload',
788+
payload,
789+
false -- emit_defaults
790+
) AS pl
791+
FROM system.jobs
792+
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
793+
)
794+
WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB`
795+
// postJobInfoQuery avoids the crdb_internal.system_jobs table
796+
// to avoid expensive full scans.
797+
postJobInfoTableQuery = `
798+
WITH
799+
running_migration_jobs AS (
800+
SELECT id, status
801+
FROM system.jobs
802+
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
803+
AND job_type = 'MIGRATION'
804+
),
805+
payloads AS (
806+
SELECT job_id, value
807+
FROM system.job_info AS payload
808+
WHERE info_key = 'legacy_payload'
809+
AND job_id IN (SELECT id FROM running_migration_jobs)
810+
ORDER BY written DESC
811+
)
812+
SELECT id, status FROM (
813+
SELECT id, status, crdb_internal.pb_to_json('cockroach.sql.jobs.jobspb.Payload', payloads.value, false) AS pl
814+
FROM running_migration_jobs AS j
815+
INNER JOIN payloads ON j.id = payloads.job_id
816+
) WHERE ((pl->'migration')->'clusterVersion') = $1::JSONB`
817+
)
818+
781819
func (m *Manager) getRunningMigrationJob(
782820
ctx context.Context, txn isql.Txn, version roachpb.Version,
783821
) (found bool, jobID jobspb.JobID, _ error) {
784822
// Wrap the version into a ClusterVersion so that the JSON looks like what the
785823
// Payload proto has inside.
786824
cv := clusterversion.ClusterVersion{Version: version}
787-
const query = `
788-
SELECT id, status
789-
FROM (
790-
SELECT id,
791-
status,
792-
crdb_internal.pb_to_json(
793-
'cockroach.sql.jobs.jobspb.Payload',
794-
payload,
795-
false -- emit_defaults
796-
) AS pl
797-
FROM crdb_internal.system_jobs
798-
WHERE status IN ` + jobs.NonTerminalStatusTupleString + `
799-
)
800-
WHERE pl->'migration'->'clusterVersion' = $1::JSON;`
825+
var query string
826+
if m.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
827+
query = postJobInfoTableQuery
828+
} else {
829+
query = preJobInfoTableQuery
830+
}
801831
jsonMsg, err := protoreflect.MessageToJSON(&cv, protoreflect.FmtFlags{EmitDefaults: false})
802832
if err != nil {
803833
return false, 0, errors.Wrap(err, "failed to marshal version to JSON")

pkg/upgrade/upgrademanager/manager_external_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
155155
created_by_type,
156156
created_by_id,
157157
claim_session_id,
158-
claim_instance_id
158+
claim_instance_id,
159+
0,
160+
NULL,
161+
job_type
159162
FROM crdb_internal.system_jobs
160163
WHERE id = $1
161164
)

0 commit comments

Comments
 (0)