Skip to content

Commit 9e03428

Browse files
committed
importer: remove schema-publishing related code
The ability to create new schemas as part of the IMPORT was added in f36a95a and it was only supported in PGDUMP. That code is now gone, so we can remove the schema-publishing related code too. Release note: None
1 parent 4ed59a9 commit 9e03428

File tree

3 files changed

+3
-253
lines changed

3 files changed

+3
-253
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -687,16 +687,12 @@ message ImportDetails {
687687
repeated string target_cols = 21;
688688
reserved 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 20;
689689
}
690-
message Schema {
691-
sqlbase.SchemaDescriptor desc = 1;
692-
}
693690
message Type {
694691
sqlbase.TypeDescriptor desc = 1;
695692
}
696693
// TODO(yuzefovich): we now can only support importing into a single table via
697694
// a single IMPORT job, so we could change this field.
698695
repeated Table tables = 1 [(gogoproto.nullable) = false];
699-
repeated Schema schemas = 23 [(gogoproto.nullable) = false];
700696
repeated Type types = 26 [(gogoproto.nullable) = false];
701697

702698
repeated string uris = 2 [(gogoproto.customname) = "URIs"];
@@ -710,7 +706,6 @@ message ImportDetails {
710706
];
711707

712708
bool prepare_complete = 12;
713-
bool schemas_published = 24;
714709
bool tables_published = 13;
715710

716711
// If the database being imported into is a multi-region database, then this
@@ -719,7 +714,7 @@ message ImportDetails {
719714
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.RegionName"
720715
];
721716

722-
reserved 4, 7, 8, 9, 10, 11, 14, 22, 25;
717+
reserved 4, 7, 8, 9, 10, 11, 14, 22, 23, 24, 25;
723718

724719
// next val: 28
725720
}

pkg/sql/importer/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ go_library(
6969
"//pkg/sql/row",
7070
"//pkg/sql/rowenc",
7171
"//pkg/sql/rowexec",
72-
"//pkg/sql/sem/catconstants",
7372
"//pkg/sql/sem/catid",
7473
"//pkg/sql/sem/eval",
7574
"//pkg/sql/sem/idxtype",

pkg/sql/importer/import_job.go

Lines changed: 2 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
2929
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
3030
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
31-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
3231
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
3332
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
3433
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
3534
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
3635
"github.com/cockroachdb/cockroach/pkg/sql/isql"
37-
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
3836
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
3937
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
4038
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
@@ -109,14 +107,6 @@ var performConstraintValidation = settings.RegisterBoolSetting(
109107
settings.WithUnsafe,
110108
)
111109

112-
type preparedSchemaMetadata struct {
113-
schemaPreparedDetails jobspb.ImportDetails
114-
schemaRewrites jobspb.DescRewriteMap
115-
newSchemaIDToName map[descpb.ID]string
116-
oldSchemaIDToName map[descpb.ID]string
117-
queuedSchemaJobs []jobspb.JobID
118-
}
119-
120110
// Resume is part of the jobs.Resumer interface.
121111
func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
122112
p := execCtx.(sql.JobExecContext)
@@ -129,35 +119,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
129119
if details.Tables != nil {
130120
// Skip prepare stage on job resumption, if it has already been completed.
131121
if !details.PrepareComplete {
132-
var schemaMetadata *preparedSchemaMetadata
133122
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(
134123
ctx context.Context, txn isql.Txn, descsCol *descs.Collection,
135124
) error {
136125
var preparedDetails jobspb.ImportDetails
137-
schemaMetadata = &preparedSchemaMetadata{
138-
newSchemaIDToName: make(map[descpb.ID]string),
139-
oldSchemaIDToName: make(map[descpb.ID]string),
140-
}
141126
var err error
142127
curDetails := details
143-
if len(details.Schemas) != 0 {
144-
schemaMetadata, err = r.prepareSchemasForIngestion(ctx, p, curDetails, txn, descsCol)
145-
if err != nil {
146-
return err
147-
}
148-
curDetails = schemaMetadata.schemaPreparedDetails
149-
}
150128

151-
// The public schema is expected to always be present in the database for 22.2+.
152-
dbDesc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Database(ctx, details.ParentID)
153-
if err != nil {
154-
return err
155-
}
156-
schemaMetadata.oldSchemaIDToName[dbDesc.GetSchemaID(catconstants.PublicSchemaName)] = catconstants.PublicSchemaName
157-
schemaMetadata.newSchemaIDToName[dbDesc.GetSchemaID(catconstants.PublicSchemaName)] = catconstants.PublicSchemaName
158-
159-
preparedDetails, err = r.prepareTablesForIngestion(ctx, p, curDetails, txn.KV(), descsCol,
160-
schemaMetadata)
129+
preparedDetails, err = r.prepareTablesForIngestion(ctx, p, curDetails, txn.KV(), descsCol)
161130
if err != nil {
162131
return err
163132
}
@@ -186,9 +155,6 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
186155
prev := md.Payload.DescriptorIDs
187156
if prev == nil {
188157
var descriptorIDs []descpb.ID
189-
for _, schema := range preparedDetails.Schemas {
190-
descriptorIDs = append(descriptorIDs, schema.Desc.GetID())
191-
}
192158
for _, table := range preparedDetails.Tables {
193159
descriptorIDs = append(descriptorIDs, table.Desc.GetID())
194160
}
@@ -201,27 +167,11 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
201167
return err
202168
}
203169

204-
// Run the queued job which updates the database descriptor to contain the
205-
// newly created schemas.
206-
// NB: Seems like the registry eventually adopts the job anyways but this
207-
// is in keeping with the semantics we use when creating a schema during
208-
// sql execution. Namely, queue job in the txn which creates the schema
209-
// desc and run once the txn has committed.
210-
if err := p.ExecCfg().JobRegistry.Run(ctx, schemaMetadata.queuedSchemaJobs); err != nil {
211-
return err
212-
}
213-
214170
// Re-initialize details after prepare step.
215171
details = r.job.Details().(jobspb.ImportDetails)
216172
emitImportJobEvent(ctx, p, jobs.StateRunning, r.job)
217173
}
218174

219-
// Create a mapping from schemaID to schemaName.
220-
schemaIDToName := make(map[descpb.ID]string)
221-
for _, i := range details.Schemas {
222-
schemaIDToName[i.Desc.GetID()] = i.Desc.GetName()
223-
}
224-
225175
for _, i := range details.Tables {
226176
var tableName string
227177
if i.Name != "" {
@@ -329,10 +279,6 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
329279
return err
330280
}
331281

332-
if err := r.publishSchemas(ctx, p.ExecCfg()); err != nil {
333-
return err
334-
}
335-
336282
if err := r.publishTables(ctx, p.ExecCfg(), res); err != nil {
337283
return err
338284
}
@@ -371,7 +317,6 @@ func (r *importResumer) prepareTablesForIngestion(
371317
details jobspb.ImportDetails,
372318
txn *kv.Txn,
373319
descsCol *descs.Collection,
374-
schemaMetadata *preparedSchemaMetadata,
375320
) (jobspb.ImportDetails, error) {
376321
importDetails := details
377322
importDetails.Tables = make([]jobspb.ImportDetails_Table, len(details.Tables))
@@ -447,94 +392,6 @@ func prepareExistingTablesForIngestion(
447392
return importing.TableDesc(), nil
448393
}
449394

450-
// prepareSchemasForIngestion is responsible for assigning the created schema
451-
// descriptors actual IDs, updating the parent DB with references to the new
452-
// schemas and writing the schema descriptors to disk.
453-
func (r *importResumer) prepareSchemasForIngestion(
454-
ctx context.Context,
455-
p sql.JobExecContext,
456-
details jobspb.ImportDetails,
457-
txn isql.Txn,
458-
descsCol *descs.Collection,
459-
) (*preparedSchemaMetadata, error) {
460-
schemaMetadata := &preparedSchemaMetadata{
461-
schemaPreparedDetails: details,
462-
newSchemaIDToName: make(map[descpb.ID]string),
463-
oldSchemaIDToName: make(map[descpb.ID]string),
464-
}
465-
466-
schemaMetadata.schemaPreparedDetails.Schemas = make([]jobspb.ImportDetails_Schema,
467-
len(details.Schemas))
468-
469-
desc, err := descsCol.MutableByID(txn.KV()).Desc(ctx, details.ParentID)
470-
if err != nil {
471-
return nil, err
472-
}
473-
474-
dbDesc, ok := desc.(*dbdesc.Mutable)
475-
if !ok {
476-
return nil, errors.Newf("expected ID %d to refer to the database being imported into",
477-
details.ParentID)
478-
}
479-
480-
schemaMetadata.schemaRewrites = make(jobspb.DescRewriteMap)
481-
mutableSchemaDescs := make([]*schemadesc.Mutable, 0)
482-
for _, desc := range details.Schemas {
483-
schemaMetadata.oldSchemaIDToName[desc.Desc.GetID()] = desc.Desc.GetName()
484-
newMutableSchemaDescriptor := schemadesc.NewBuilder(desc.Desc).BuildCreatedMutable().(*schemadesc.Mutable)
485-
486-
// Verification steps have passed, generate a new schema ID. We do this
487-
// last because we want to avoid calling GenerateUniqueDescID if there's
488-
// any kind of error in the prior stages of import.
489-
id, err := p.ExecCfg().DescIDGenerator.GenerateUniqueDescID(ctx)
490-
if err != nil {
491-
return nil, err
492-
}
493-
newMutableSchemaDescriptor.Version = 1
494-
newMutableSchemaDescriptor.ID = id
495-
mutableSchemaDescs = append(mutableSchemaDescs, newMutableSchemaDescriptor)
496-
497-
schemaMetadata.newSchemaIDToName[id] = newMutableSchemaDescriptor.GetName()
498-
499-
// Update the parent database with this schema information.
500-
dbDesc.AddSchemaToDatabase(newMutableSchemaDescriptor.Name,
501-
descpb.DatabaseDescriptor_SchemaInfo{ID: newMutableSchemaDescriptor.ID})
502-
503-
schemaMetadata.schemaRewrites[desc.Desc.ID] = &jobspb.DescriptorRewrite{
504-
ID: id,
505-
}
506-
}
507-
508-
// Queue a job to write the updated database descriptor.
509-
schemaMetadata.queuedSchemaJobs, err = writeNonDropDatabaseChange(ctx, dbDesc, txn, descsCol, p,
510-
fmt.Sprintf("updating parent database %s when importing new schemas", dbDesc.GetName()))
511-
if err != nil {
512-
return nil, err
513-
}
514-
515-
// Finally create the schemas on disk.
516-
for i, mutDesc := range mutableSchemaDescs {
517-
b := txn.KV().NewBatch()
518-
kvTrace := p.ExtendedEvalContext().Tracing.KVTracingEnabled()
519-
if err := descsCol.WriteDescToBatch(ctx, kvTrace, mutDesc, b); err != nil {
520-
return nil, err
521-
}
522-
if !mutDesc.SkipNamespace() {
523-
if err := descsCol.InsertNamespaceEntryToBatch(ctx, kvTrace, mutDesc, b); err != nil {
524-
return nil, err
525-
}
526-
}
527-
if err := txn.KV().Run(ctx, b); err != nil {
528-
return nil, err
529-
}
530-
schemaMetadata.schemaPreparedDetails.Schemas[i] = jobspb.ImportDetails_Schema{
531-
Desc: mutDesc.SchemaDesc(),
532-
}
533-
}
534-
535-
return schemaMetadata, err
536-
}
537-
538395
// bindTableDescImportProperties updates the table descriptor at the start of an
539396
// import for a table that existed before the import.
540397
func bindTableDescImportProperties(
@@ -643,50 +500,6 @@ func (r *importResumer) publishTables(
643500
return nil
644501
}
645502

646-
// publishSchemas updates the status of imported schemas from OFFLINE to PUBLIC.
647-
func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.ExecutorConfig) error {
648-
details := r.job.Details().(jobspb.ImportDetails)
649-
// Schemas should only be published once.
650-
if details.SchemasPublished {
651-
return nil
652-
}
653-
log.Event(ctx, "making schemas live")
654-
655-
return sql.DescsTxn(ctx, execCfg, func(
656-
ctx context.Context, txn isql.Txn, descsCol *descs.Collection,
657-
) error {
658-
b := txn.KV().NewBatch()
659-
for _, schema := range details.Schemas {
660-
newDesc, err := descsCol.MutableByID(txn.KV()).Desc(ctx, schema.Desc.GetID())
661-
if err != nil {
662-
return err
663-
}
664-
newSchemaDesc, ok := newDesc.(*schemadesc.Mutable)
665-
if !ok {
666-
return errors.Newf("expected schema descriptor with ID %v, got %v",
667-
schema.Desc.GetID(), newDesc)
668-
}
669-
newSchemaDesc.SetPublic()
670-
if err := descsCol.WriteDescToBatch(
671-
ctx, false /* kvTrace */, newSchemaDesc, b,
672-
); err != nil {
673-
return errors.Wrapf(err, "publishing schema %d", newSchemaDesc.ID)
674-
}
675-
}
676-
if err := txn.KV().Run(ctx, b); err != nil {
677-
return errors.Wrap(err, "publishing schemas")
678-
}
679-
680-
// Update job record to mark tables published state as complete.
681-
details.SchemasPublished = true
682-
err := r.job.WithTxn(txn).SetDetails(ctx, details)
683-
if err != nil {
684-
return errors.Wrap(err, "updating job details after publishing schemas")
685-
}
686-
return nil
687-
})
688-
}
689-
690503
// checkVirtualConstraints checks constraints that are enforced via runtime
691504
// checks, such as uniqueness checks that are not directly backed by an index.
692505
func (r *importResumer) checkVirtualConstraints(
@@ -1161,7 +974,7 @@ func (r *importResumer) dropSchemas(
1161974
// If the prepare step of the import job was not completed then the
1162975
// descriptors do not need to be rolled back as the txn updating them never
1163976
// completed.
1164-
if !details.PrepareComplete || len(details.Schemas) == 0 {
977+
if !details.PrepareComplete {
1165978
return nil, nil
1166979
}
1167980

@@ -1177,70 +990,13 @@ func (r *importResumer) dropSchemas(
1177990
details.ParentID)
1178991
}
1179992

1180-
droppedSchemaIDs := make([]descpb.ID, 0)
1181-
for _, schema := range details.Schemas {
1182-
desc, err := descsCol.MutableByID(txn.KV()).Desc(ctx, schema.Desc.ID)
1183-
if err != nil {
1184-
return nil, err
1185-
}
1186-
var schemaDesc *schemadesc.Mutable
1187-
var ok bool
1188-
if schemaDesc, ok = desc.(*schemadesc.Mutable); !ok {
1189-
return nil, errors.Newf("unable to resolve schema desc with ID %d", schema.Desc.ID)
1190-
}
1191-
1192-
// Mark the descriptor as dropped and write it to the batch.
1193-
// Delete namespace entry or update draining names depending on version.
1194-
schemaDesc.SetDropped()
1195-
droppedSchemaIDs = append(droppedSchemaIDs, schemaDesc.GetID())
1196-
1197-
b := txn.KV().NewBatch()
1198-
if dbDesc.Schemas != nil {
1199-
delete(dbDesc.Schemas, schemaDesc.GetName())
1200-
}
1201-
if err := descsCol.WriteDescToBatch(
1202-
ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), schemaDesc, b,
1203-
); err != nil {
1204-
return nil, err
1205-
}
1206-
if err := descsCol.DeleteNamespaceEntryToBatch(
1207-
ctx, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), schemaDesc, b,
1208-
); err != nil {
1209-
return nil, err
1210-
}
1211-
err = txn.KV().Run(ctx, b)
1212-
if err != nil {
1213-
return nil, err
1214-
}
1215-
}
1216-
1217993
// Write out the change to the database. This only creates a job record to be
1218994
// run after the txn commits.
1219995
queuedJob, err := writeNonDropDatabaseChange(ctx, dbDesc, txn, descsCol, p, "")
1220996
if err != nil {
1221997
return nil, err
1222998
}
1223999

1224-
// Create the job to drop the schema.
1225-
dropSchemaJobRecord := jobs.Record{
1226-
Description: "dropping schemas as part of an import job rollback",
1227-
Username: p.User(),
1228-
DescriptorIDs: droppedSchemaIDs,
1229-
Details: jobspb.SchemaChangeDetails{
1230-
DroppedSchemas: droppedSchemaIDs,
1231-
DroppedDatabaseID: descpb.InvalidID,
1232-
FormatVersion: jobspb.DatabaseJobFormatVersion,
1233-
},
1234-
Progress: jobspb.SchemaChangeProgress{},
1235-
NonCancelable: true,
1236-
}
1237-
jobID := p.ExecCfg().JobRegistry.MakeJobID()
1238-
job, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, dropSchemaJobRecord, jobID, txn)
1239-
if err != nil {
1240-
return nil, err
1241-
}
1242-
queuedJob = append(queuedJob, job.ID())
1243-
12441000
return queuedJob, nil
12451001
}
12461002

0 commit comments

Comments
 (0)