Skip to content

Commit fd18c1b

Browse files
craig[bot]yuzefovichdt
committed
148400: importer: purge all PGDUMP and MYSQLDUMP related code r=yuzefovich a=yuzefovich All this code is no longer used. Note that there are still some more things to clean up (like we can assume that no tables are new, and we don't need to support creating new schemas in IMPORT either), but all those things will be addressed separately. This commit focuses only on removing the related workload readers as well as the roachpb formats. Epic: None Release note: None 148420: execinfrapb: minor cleanup of Expression r=yuzefovich a=yuzefovich Removed unused version field and fix up a comment. Epic: None Release note: None 148441: jobs: delete dead code r=dt a=dt Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: David Taylor <[email protected]>
4 parents bf5dd77 + d9aa1c8 + 1b82c9f + 9262d84 commit fd18c1b

29 files changed

+190
-4274
lines changed

DEPS.bzl

Lines changed: 24 additions & 444 deletions
Large diffs are not rendered by default.

build/bazelutil/distdir_files.bzl

Lines changed: 8 additions & 50 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ require (
262262
gopkg.in/yaml.v2 v2.4.0
263263
gopkg.in/yaml.v3 v3.0.1
264264
honnef.co/go/tools v0.5.1
265-
vitess.io/vitess v0.0.0-00010101000000-000000000000
266265
)
267266

268267
require (
@@ -494,8 +493,6 @@ replace github.com/olekukonko/tablewriter => github.com/cockroachdb/tablewriter
494493

495494
replace github.com/abourget/teamcity => github.com/cockroachdb/teamcity v0.0.0-20180905144921-8ca25c33eb11
496495

497-
replace vitess.io/vitess => github.com/cockroachdb/vitess v0.0.0-20210218160543-54524729cc82
498-
499496
replace gopkg.in/yaml.v2 => github.com/cockroachdb/yaml v0.0.0-20210825132133-2d6955c8edbc
500497

501498
replace github.com/docker/docker => github.com/moby/moby v24.0.6+incompatible

go.sum

Lines changed: 0 additions & 115 deletions
Large diffs are not rendered by default.

pkg/backup/restore_job.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -985,9 +985,8 @@ func createImportingDescriptors(
985985
for _, desc := range sqlDescs {
986986
// Decide which offline tables to include in the restore:
987987
//
988-
// - An offline table created by RESTORE or IMPORT PGDUMP is
989-
// fully discarded. The table will not exist in the restoring
990-
// cluster.
988+
// - An offline table created by RESTORE is fully discarded. The table
989+
// will not exist in the restoring cluster.
991990
//
992991
// - An offline table undergoing an IMPORT INTO in traditional
993992
// restore has all importing data elided in the restore

pkg/build/starlarkutil/starlarkutil_test.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ func TestGetExistingMirrorsFromDepsBzl(t *testing.T) {
1515
depsbzl := `# leading comment
1616
load("@bazel_gazelle//:deps.bzl", "go_repository")
1717
def go_deps():
18-
go_repository(
19-
name = "io_vitess_vitess",
20-
build_file_proto_mode = "disable_global",
21-
importpath = "vitess.io/vitess",
22-
sha256 = "FAKESHA256",
23-
strip_prefix = "github.com/cockroachdb/[email protected]",
24-
urls = ["https://example.com/fakeurl"],
25-
)
2618
go_repository(
2719
name = "com_github_akavel_rsrc",
2820
build_file_proto_mode = "disable_global",
@@ -41,11 +33,8 @@ def go_deps():
4133
`
4234
mirrors, err := downloadableArtifactsFromDepsBzl(depsbzl)
4335
require.NoError(t, err)
44-
require.Equal(t, len(mirrors), 2)
45-
mirror := mirrors["io_vitess_vitess"]
46-
require.Equal(t, mirror.URL, "https://example.com/fakeurl")
47-
require.Equal(t, mirror.Sha256, "FAKESHA256")
48-
mirror = mirrors["com_github_alecthomas_units"]
36+
require.Equal(t, len(mirrors), 1)
37+
mirror := mirrors["com_github_alecthomas_units"]
4938
require.Equal(t, mirror.URL, "https://foo/bar.zip")
5039
require.Equal(t, mirror.Sha256, "abcdefghij")
5140
}

pkg/clusterversion/cockroach_versions.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,6 @@ const (
187187

188188
TODO_Delete_V25_1_Start
189189

190-
// TODO_Delete_V25_1_AddJobsTables added new jobs tables.
191-
TODO_Delete_V25_1_AddJobsTables
192-
193190
// TODO_Delete_V25_1_AddRangeForceFlushKey adds the RangeForceFlushKey, a replicated
194191
// range-ID local key, which is written below raft.
195192
TODO_Delete_V25_1_AddRangeForceFlushKey
@@ -203,19 +200,6 @@ const (
203200
// that are part of the XA two-phase commit protocol.
204201
TODO_Delete_V25_1_PreparedTransactionsTable
205202

206-
// TODO_Delete_V25_1_AddJobsColumns added new columns to system.jobs.
207-
TODO_Delete_V25_1_AddJobsColumns
208-
209-
// TODO_Delete_V25_1_JobsWritesFence is an empty version that is used to add a "fence"
210-
// between the column addition version and the backfill version. This allows
211-
// the backfill version's upgrade step to make the assumption that all nodes
212-
// will be writing to the new columns, since moving from fence to backfill can
213-
// only start once no nodes are still on add-columnns.
214-
TODO_Delete_V25_1_JobsWritesFence
215-
216-
// TODO_Delete_V25_1_JobsBackfill backfills the new jobs tables and columns.
217-
TODO_Delete_V25_1_JobsBackfill
218-
219203
// V25_1 is CockroachDB v25.1. It's used for all v25.1.x patch releases.
220204
V25_1
221205

@@ -271,13 +255,9 @@ var versionTable = [numKeys]roachpb.Version{
271255
// v25.1 versions. Internal versions must be even.
272256
TODO_Delete_V25_1_Start: {Major: 24, Minor: 3, Internal: 2},
273257

274-
TODO_Delete_V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},
275258
TODO_Delete_V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 8},
276259
TODO_Delete_V25_1_BatchStreamRPC: {Major: 24, Minor: 3, Internal: 10},
277260
TODO_Delete_V25_1_PreparedTransactionsTable: {Major: 24, Minor: 3, Internal: 12},
278-
TODO_Delete_V25_1_AddJobsColumns: {Major: 24, Minor: 3, Internal: 14},
279-
TODO_Delete_V25_1_JobsWritesFence: {Major: 24, Minor: 3, Internal: 16},
280-
TODO_Delete_V25_1_JobsBackfill: {Major: 24, Minor: 3, Internal: 18},
281261

282262
V25_1: {Major: 25, Minor: 1, Internal: 0},
283263

pkg/cmd/cr2pg/sqlstream/stream.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ import (
1717
"github.com/cockroachdb/errors"
1818
)
1919

20-
// Modified from importer/read_import_pgdump.go.
21-
2220
// Stream streams an io.Reader into tree.Statements.
2321
type Stream struct {
2422
scan *bufio.Scanner

pkg/jobs/registry.go

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ func batchJobInsertStmt(
483483
return "", nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to make timestamp for creation of job")
484484
}
485485
instanceID := r.ID()
486-
columns := []string{`id`, `created`, `status`, `claim_session_id`, `claim_instance_id`, `job_type`}
486+
columns := []string{`id`, `created`, `status`, `claim_session_id`, `claim_instance_id`, `job_type`, `owner`, `description`}
487487
valueFns := map[string]func(*Job) (interface{}, error){
488488
`id`: func(job *Job) (interface{}, error) { return job.ID(), nil },
489489
`created`: func(job *Job) (interface{}, error) { return created, nil },
@@ -494,12 +494,8 @@ func batchJobInsertStmt(
494494
payload := job.Payload()
495495
return payload.Type().String(), nil
496496
},
497-
}
498-
499-
if schemaVersion.AtLeast(clusterversion.TODO_Delete_V25_1_AddJobsColumns.Version()) {
500-
columns = append(columns, `owner`, `description`)
501-
valueFns[`owner`] = func(job *Job) (interface{}, error) { return job.Payload().UsernameProto.Decode().Normalized(), nil }
502-
valueFns[`description`] = func(job *Job) (interface{}, error) { return job.Payload().Description, nil }
497+
`owner`: func(job *Job) (interface{}, error) { return job.Payload().UsernameProto.Decode().Normalized(), nil },
498+
`description`: func(job *Job) (interface{}, error) { return job.Payload().Description, nil },
503499
}
504500

505501
appendValues := func(job *Job, vals *[]interface{}) (err error) {
@@ -590,15 +586,9 @@ func (r *Registry) CreateJobWithTxn(
590586
return errors.NewAssertionErrorWithWrappedErrf(err, "failed to construct job created timestamp")
591587
}
592588

593-
cols := []string{"id", "created", "status", "claim_session_id", "claim_instance_id", "job_type"}
594-
vals := []interface{}{jobID, created, StateRunning, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
595-
v, err := txn.GetSystemSchemaVersion(ctx)
596-
if err != nil {
597-
return err
598-
}
599-
if v.AtLeast(clusterversion.TODO_Delete_V25_1_AddJobsColumns.Version()) {
600-
cols = append(cols, "owner", "description")
601-
vals = append(vals, j.mu.payload.UsernameProto.Decode().Normalized(), j.mu.payload.Description)
589+
cols := []string{"id", "created", "status", "claim_session_id", "claim_instance_id", "job_type", "owner", "description"}
590+
vals := []interface{}{
591+
jobID, created, StateRunning, s.ID().UnsafeBytes(), r.ID(), jobType.String(), j.mu.payload.UsernameProto.Decode().Normalized(), j.mu.payload.Description,
602592
}
603593

604594
totalNumCols := len(cols)
@@ -730,18 +720,9 @@ func (r *Registry) CreateAdoptableJobWithTxn(
730720
}
731721
typ := j.mu.payload.Type().String()
732722

733-
cols := []string{"id", "created", "status", "created_by_type", "created_by_id", "job_type"}
734-
placeholders := []string{"$1", "now() at time zone 'utc'", "$2", "$3", "$4", "$5"}
735-
vals := []interface{}{jobID, StateRunning, createdByType, createdByID, typ}
736-
v, err := txn.GetSystemSchemaVersion(ctx)
737-
if err != nil {
738-
return err
739-
}
740-
if v.AtLeast(clusterversion.TODO_Delete_V25_1_AddJobsColumns.Version()) {
741-
cols = append(cols, "owner", "description")
742-
placeholders = append(placeholders, "$6", "$7")
743-
vals = append(vals, j.mu.payload.UsernameProto.Decode().Normalized(), j.mu.payload.Description)
744-
}
723+
cols := []string{"id", "created", "status", "created_by_type", "created_by_id", "job_type", "owner", "description"}
724+
placeholders := []string{"$1", "now() at time zone 'utc'", "$2", "$3", "$4", "$5", "$6", "$7"}
725+
vals := []interface{}{jobID, StateRunning, createdByType, createdByID, typ, j.mu.payload.UsernameProto.Decode().Normalized(), j.mu.payload.Description}
745726

746727
// Insert the job row, but do not set a `claim_session_id`. By not
747728
// setting the claim, the job can be adopted by any node and will
@@ -1275,20 +1256,11 @@ func (r *Registry) cleanupOldJobsPage(
12751256
}
12761257

12771258
counts := make(map[string]int)
1278-
for i, tbl := range jobMetadataTables {
1259+
for _, tbl := range jobMetadataTables {
12791260
var deleted int
12801261
if err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
12811262
// Tables other than job_info -- the 0th -- are only present if the txn is
12821263
// running at a version that includes them.
1283-
if i > 0 {
1284-
v, err := txn.GetSystemSchemaVersion(ctx)
1285-
if err != nil {
1286-
return err
1287-
}
1288-
if v.Less(clusterversion.TODO_Delete_V25_1_AddJobsTables.Version()) {
1289-
return nil
1290-
}
1291-
}
12921264
deleted, err = txn.Exec(ctx, redact.RedactableString("gc-job-"+tbl), txn.KV(),
12931265
"DELETE FROM system."+tbl+" WHERE job_id = ANY($1)", toDelete,
12941266
)
@@ -1337,17 +1309,7 @@ func (r *Registry) DeleteTerminalJobByID(ctx context.Context, id jobspb.JobID) e
13371309
if err != nil {
13381310
return err
13391311
}
1340-
for i, tbl := range jobMetadataTables {
1341-
if i > 0 {
1342-
v, err := txn.GetSystemSchemaVersion(ctx)
1343-
if err != nil {
1344-
return err
1345-
}
1346-
if v.Less(clusterversion.TODO_Delete_V25_1_AddJobsTables.Version()) {
1347-
break
1348-
}
1349-
}
1350-
1312+
for _, tbl := range jobMetadataTables {
13511313
_, err = txn.Exec(
13521314
ctx, redact.RedactableString("delete-job-"+tbl), txn.KV(),
13531315
"DELETE FROM system."+tbl+" WHERE job_id = $1", id,

pkg/jobs/update.go

Lines changed: 60 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"strings"
1313
"time"
1414

15-
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1615
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1716
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1817
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -281,96 +280,88 @@ WHERE id = $1
281280
}
282281
}
283282

284-
v, err := u.txn.GetSystemSchemaVersion(ctx)
285-
if err != nil {
286-
return err
287-
}
288-
if v.AtLeast(clusterversion.TODO_Delete_V25_1_AddJobsTables.Version()) {
289-
if ju.md.State != "" && ju.md.State != state {
290-
if err := j.Messages().Record(ctx, u.txn, "state", string(ju.md.State)); err != nil {
283+
if ju.md.State != "" && ju.md.State != state {
284+
if err := j.Messages().Record(ctx, u.txn, "state", string(ju.md.State)); err != nil {
285+
return err
286+
}
287+
// If we are changing state, we should clear out the status, unless
288+
// we are about to set it to something instead.
289+
if progress == nil || progress.StatusMessage == "" {
290+
if err := j.StatusStorage().Clear(ctx, u.txn); err != nil {
291291
return err
292292
}
293-
// If we are changing state, we should clear out the status, unless
294-
// we are about to set it to something instead.
295-
if progress == nil || progress.StatusMessage == "" {
296-
if err := j.StatusStorage().Clear(ctx, u.txn); err != nil {
297-
return err
298-
}
299-
}
300293
}
294+
}
301295

302-
if progress != nil {
303-
var ts hlc.Timestamp
304-
if hwm := progress.GetHighWater(); hwm != nil {
305-
ts = *hwm
306-
}
296+
if progress != nil {
297+
var ts hlc.Timestamp
298+
if hwm := progress.GetHighWater(); hwm != nil {
299+
ts = *hwm
300+
}
307301

308-
if err := j.ProgressStorage().Set(ctx, u.txn, float64(progress.GetFractionCompleted()), ts); err != nil {
309-
return err
310-
}
302+
if err := j.ProgressStorage().Set(ctx, u.txn, float64(progress.GetFractionCompleted()), ts); err != nil {
303+
return err
304+
}
311305

312-
if progress.StatusMessage != beforeProgress.StatusMessage {
313-
if err := j.StatusStorage().Set(ctx, u.txn, progress.StatusMessage); err != nil {
314-
return err
315-
}
306+
if progress.StatusMessage != beforeProgress.StatusMessage {
307+
if err := j.StatusStorage().Set(ctx, u.txn, progress.StatusMessage); err != nil {
308+
return err
316309
}
310+
}
317311

318-
if progress.TraceID != beforeProgress.TraceID {
319-
if err := j.Messages().Record(ctx, u.txn, "trace-id", fmt.Sprintf("%d", progress.TraceID)); err != nil {
320-
return err
321-
}
312+
if progress.TraceID != beforeProgress.TraceID {
313+
if err := j.Messages().Record(ctx, u.txn, "trace-id", fmt.Sprintf("%d", progress.TraceID)); err != nil {
314+
return err
322315
}
323316
}
324317
}
325-
if v.AtLeast(clusterversion.TODO_Delete_V25_1_AddJobsColumns.Version()) {
326318

327-
vals := []interface{}{j.ID()}
319+
vals := []interface{}{j.ID()}
328320

329-
var update strings.Builder
321+
var update strings.Builder
330322

331-
if payloadBytes != nil {
332-
if beforePayload.Description != payload.Description {
333-
if update.Len() > 0 {
334-
update.WriteString(", ")
335-
}
336-
vals = append(vals, payload.Description)
337-
fmt.Fprintf(&update, "description = $%d", len(vals))
323+
if payloadBytes != nil {
324+
if beforePayload.Description != payload.Description {
325+
if update.Len() > 0 {
326+
update.WriteString(", ")
338327
}
328+
vals = append(vals, payload.Description)
329+
fmt.Fprintf(&update, "description = $%d", len(vals))
330+
}
339331

340-
if beforePayload.UsernameProto.Decode() != payload.UsernameProto.Decode() {
341-
if update.Len() > 0 {
342-
update.WriteString(", ")
343-
}
344-
vals = append(vals, payload.UsernameProto.Decode().Normalized())
345-
fmt.Fprintf(&update, "owner = $%d", len(vals))
332+
if beforePayload.UsernameProto.Decode() != payload.UsernameProto.Decode() {
333+
if update.Len() > 0 {
334+
update.WriteString(", ")
346335
}
336+
vals = append(vals, payload.UsernameProto.Decode().Normalized())
337+
fmt.Fprintf(&update, "owner = $%d", len(vals))
338+
}
347339

348-
if beforePayload.Error != payload.Error {
349-
if update.Len() > 0 {
350-
update.WriteString(", ")
351-
}
352-
vals = append(vals, payload.Error)
353-
fmt.Fprintf(&update, "error_msg = $%d", len(vals))
340+
if beforePayload.Error != payload.Error {
341+
if update.Len() > 0 {
342+
update.WriteString(", ")
354343
}
344+
vals = append(vals, payload.Error)
345+
fmt.Fprintf(&update, "error_msg = $%d", len(vals))
346+
}
355347

356-
if beforePayload.FinishedMicros != payload.FinishedMicros {
357-
if update.Len() > 0 {
358-
update.WriteString(", ")
359-
}
360-
vals = append(vals, time.UnixMicro(payload.FinishedMicros))
361-
fmt.Fprintf(&update, "finished = $%d", len(vals))
348+
if beforePayload.FinishedMicros != payload.FinishedMicros {
349+
if update.Len() > 0 {
350+
update.WriteString(", ")
362351
}
363-
352+
vals = append(vals, time.UnixMicro(payload.FinishedMicros))
353+
fmt.Fprintf(&update, "finished = $%d", len(vals))
364354
}
365-
if len(vals) > 1 {
366-
stmt := fmt.Sprintf("UPDATE system.jobs SET %s WHERE id = $1", update.String())
367-
if _, err := u.txn.ExecEx(
368-
ctx, "job-update-row", u.txn.KV(),
369-
sessiondata.NodeUserSessionDataOverride,
370-
stmt, vals...,
371-
); err != nil {
372-
return err
373-
}
355+
356+
}
357+
if len(vals) > 1 {
358+
stmt := fmt.Sprintf("UPDATE system.jobs SET %s WHERE id = $1", update.String())
359+
if _, err := u.txn.ExecEx(
360+
ctx, "job-update-row", u.txn.KV(),
361+
sessiondata.NodeUserSessionDataOverride,
362+
stmt, vals...,
363+
); err != nil {
364+
return err
374365
}
375366
}
376367

0 commit comments

Comments
 (0)