Skip to content

Commit bac4c79

Browse files
craig[bot]Xiang-Gu
andcommitted
107504: sql: Add observability when schema change jobs are blocked by concurrent ones r=Xiang-Gu a=Xiang-Gu Previously, when using legacy schema changes, concurrent schema changes will wait for preceding ones if it's not first in line but we didn't provide the blocking schema change job ID in the log nor error. This is inadequate in debugging and troubling shooting support ticket. This commit fixes that by providing the blocking schema change job ID if this happens. Fix cockroachdb#103733 Release note (sql): When there are concurrent legacy schema change jobs, we will be able to know what the blocking schema change job ID is from the relevant log entries. Co-authored-by: Xiang Gu <[email protected]>
2 parents 284b6c0 + da746bf commit bac4c79

File tree

12 files changed

+233
-6
lines changed

12 files changed

+233
-6
lines changed

pkg/sql/catalog/dbdesc/database_desc.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
466466
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
467467
}
468468

469+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
470+
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
471+
if desc.DeclarativeSchemaChangerState != nil &&
472+
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
473+
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
474+
}
475+
return ret
476+
}
477+
469478
// GetDefaultPrivilegeDescriptor returns a DefaultPrivilegeDescriptor.
470479
func (desc *immutable) GetDefaultPrivilegeDescriptor() catalog.DefaultPrivilegeDescriptor {
471480
defaultPrivilegeDescriptor := desc.GetDefaultPrivileges()

pkg/sql/catalog/descriptor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ type Descriptor interface {
246246
// in progress, either legacy or declarative.
247247
HasConcurrentSchemaChanges() bool
248248

249+
// ConcurrentSchemaChangeJobIDs returns all in-progress schema change
250+
// jobs, either legacy or declarative.
251+
ConcurrentSchemaChangeJobIDs() []catpb.JobID
252+
249253
// SkipNamespace is true when a descriptor should not have a namespace record.
250254
SkipNamespace() bool
251255

pkg/sql/catalog/funcdesc/func_desc.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
394394
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
395395
}
396396

397+
// ConcurrentSchemaChangeJobIDs implements the catalog.Descriptor interface.
398+
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
399+
if desc.DeclarativeSchemaChangerState != nil &&
400+
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
401+
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
402+
}
403+
return ret
404+
}
405+
397406
// SkipNamespace implements the catalog.Descriptor interface.
398407
func (desc *immutable) SkipNamespace() bool {
399408
return true

pkg/sql/catalog/schemadesc/schema_desc.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
336336
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID
337337
}
338338

339+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
340+
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
341+
if desc.DeclarativeSchemaChangerState != nil &&
342+
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
343+
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
344+
}
345+
return ret
346+
}
347+
339348
// MaybeIncrementVersion implements the MutableDescriptor interface.
340349
func (desc *Mutable) MaybeIncrementVersion() {
341350
// Already incremented, no-op.

pkg/sql/catalog/schemadesc/synthetic_schema_desc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ func (p synthetic) HasConcurrentSchemaChanges() bool {
111111
return false
112112
}
113113

114+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
115+
func (p synthetic) ConcurrentSchemaChangeJobIDs() []catpb.JobID {
116+
return nil
117+
}
118+
114119
// SkipNamespace implements the descriptor interface.
115120
// We never store synthetic descriptors.
116121
func (p synthetic) SkipNamespace() bool {

pkg/sql/catalog/tabledesc/table_desc.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ func (desc *wrapper) HasConcurrentSchemaChanges() bool {
7373
len(desc.MutationJobs) > 0
7474
}
7575

76+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
77+
func (desc *wrapper) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
78+
if desc.DeclarativeSchemaChangerState != nil &&
79+
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
80+
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
81+
}
82+
if len(desc.MutationJobs) > 0 {
83+
for _, mutationJob := range desc.MutationJobs {
84+
ret = append(ret, mutationJob.JobID)
85+
}
86+
}
87+
return ret
88+
}
89+
7690
// SkipNamespace implements the descriptor interface.
7791
func (desc *wrapper) SkipNamespace() bool {
7892
// Virtual tables are hard-coded and don't have entries in the

pkg/sql/catalog/typedesc/table_implicit_record_type.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,11 @@ func (v *tableImplicitRecordType) HasConcurrentSchemaChanges() bool {
281281
return false
282282
}
283283

284+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
285+
func (v *tableImplicitRecordType) ConcurrentSchemaChangeJobIDs() []catpb.JobID {
286+
return nil
287+
}
288+
284289
// SkipNamespace implements catalog.Descriptor. We never store table implicit
285290
// record type which is always constructed in memory.
286291
func (v *tableImplicitRecordType) SkipNamespace() bool {

pkg/sql/catalog/typedesc/type_desc.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,15 @@ func (desc *immutable) HasConcurrentSchemaChanges() bool {
872872
return false
873873
}
874874

875+
// ConcurrentSchemaChangeJobIDs implements catalog.Descriptor.
876+
func (desc *immutable) ConcurrentSchemaChangeJobIDs() (ret []catpb.JobID) {
877+
if desc.DeclarativeSchemaChangerState != nil &&
878+
desc.DeclarativeSchemaChangerState.JobID != catpb.InvalidJobID {
879+
ret = append(ret, desc.DeclarativeSchemaChangerState.JobID)
880+
}
881+
return ret
882+
}
883+
875884
// SkipNamespace implements the descriptor interface.
876885
func (desc *immutable) SkipNamespace() bool {
877886
return false

pkg/sql/schema_change_plan_node.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/security/username"
2222
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
2323
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
24+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
2425
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2526
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2627
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
@@ -202,6 +203,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
202203
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
203204
now := p.ExecCfg().Clock.Now()
204205
var isBlocked bool
206+
var blockingJobIDs []catpb.JobID
205207
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(
206208
ctx context.Context, txn descs.Txn,
207209
) error {
@@ -213,6 +215,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
213215
return err
214216
}
215217
isBlocked = desc.HasConcurrentSchemaChanges()
218+
blockingJobIDs = desc.ConcurrentSchemaChangeJobIDs()
216219
return nil
217220
}); err != nil {
218221
return err
@@ -222,8 +225,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
222225
}
223226
if logEvery.ShouldLog() {
224227
log.Infof(ctx,
225-
"schema change waiting for concurrent schema changes on descriptor %d,"+
226-
" waited %v so far", descID, timeutil.Since(start),
228+
"schema change waiting for %v concurrent schema change job(s) %v on descriptor %d,"+
229+
" waited %v so far", len(blockingJobIDs), blockingJobIDs, descID, timeutil.Since(start),
227230
)
228231
}
229232
if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil {

pkg/sql/schema_changer.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,14 +584,29 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
584584
// descriptor, it seems possible for a job to be resumed after the mutation
585585
// has already been removed. If there's a mutation provided, we should check
586586
// whether it actually exists on the table descriptor and exit the job if not.
587-
for i, mutation := range tableDesc.AllMutations() {
587+
allMutations := tableDesc.AllMutations()
588+
for i, mutation := range allMutations {
588589
if mutation.MutationID() == sc.mutationID {
589590
if i != 0 {
591+
blockingJobIDsAsSet := make(map[catpb.JobID]struct{})
592+
for j := 0; j < i; j++ {
593+
blockingJobID, err := mustGetJobIDWithMutationID(tableDesc, allMutations[j].MutationID())
594+
if err != nil {
595+
return err
596+
}
597+
blockingJobIDsAsSet[blockingJobID] = struct{}{}
598+
}
599+
blockingJobIDs := make([]catpb.JobID, 0, len(blockingJobIDsAsSet))
600+
for jobID := range blockingJobIDsAsSet {
601+
blockingJobIDs = append(blockingJobIDs, jobID)
602+
}
590603
log.Infof(ctx,
591-
"schema change on %q (v%d): another change is still in progress",
592-
desc.GetName(), desc.GetVersion(),
604+
"schema change on %q (v%d): another %v schema change job(s) %v is still in progress "+
605+
"and it is blocking this job from proceeding",
606+
desc.GetName(), desc.GetVersion(), len(blockingJobIDs), blockingJobIDs,
593607
)
594-
return errSchemaChangeNotFirstInLine
608+
return errors.Wrapf(errSchemaChangeNotFirstInLine, "schema change is "+
609+
"blocked by %v other schema change job(s) %v", len(blockingJobIDs), blockingJobIDs)
595610
}
596611
break
597612
}
@@ -600,6 +615,21 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
600615
return nil
601616
}
602617

618+
func mustGetJobIDWithMutationID(
619+
tableDesc catalog.TableDescriptor, mutationID descpb.MutationID,
620+
) (jobID catpb.JobID, err error) {
621+
for _, mutationJob := range tableDesc.GetMutationJobs() {
622+
if mutationJob.MutationID == mutationID {
623+
jobID = mutationJob.JobID
624+
}
625+
}
626+
if jobID == catpb.InvalidJobID {
627+
return 0, errors.AssertionFailedf("mutation job with mutation ID %v is not found in table %q",
628+
mutationID, tableDesc.GetName())
629+
}
630+
return jobID, nil
631+
}
632+
603633
func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) {
604634
// Retrieve the descriptor that is being changed.
605635
var desc catalog.Descriptor

0 commit comments

Comments
 (0)