Skip to content

Commit da746bf

Browse files
committed
sql: Add blocking schema changer job IDs to log entries for concurrent schema change
Previously, we had log entries that says a schema change job is waiting for concurrent schema change jobs without saying what those blocking schema change jobs are. This is inadequate for troubleshooting and debugging. This commit fixes it. Note that because we have both legacy and declarative schema changer, there are two paths of handling concurrent schema changes: 1. legacy ongoing, another legacy comes in: The second, legacy schema changer will go through, create a job if needed, and the job will wait for the previous legacy schema changer job to finish before proceeding. This commit provides all schema change jobs before the current one that are blocking. 2. legacy + declarative, declarative + declarative, or declarative + legacy: in all three cases, the second schema change will return an error back to its upstream -- conn_executor -- and we wait for all concurrent schema change jobs to finish in a retry loop in the conn_executor. We also provides all the concurrent schema change job IDs that the conn_executor is waiting in this commit. Release note: None
1 parent fac0930 commit da746bf

File tree

4 files changed

+169
-6
lines changed

4 files changed

+169
-6
lines changed

pkg/sql/schema_change_plan_node.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/security/username"
2222
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
2323
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
24+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
2425
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2526
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2627
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
@@ -203,6 +204,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
203204
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
204205
now := p.ExecCfg().Clock.Now()
205206
var isBlocked bool
207+
var blockingJobIDs []catpb.JobID
206208
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(
207209
ctx context.Context, txn descs.Txn,
208210
) error {
@@ -214,6 +216,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
214216
return err
215217
}
216218
isBlocked = desc.HasConcurrentSchemaChanges()
219+
blockingJobIDs = desc.ConcurrentSchemaChangeJobIDs()
217220
return nil
218221
}); err != nil {
219222
return err
@@ -225,8 +228,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
225228
}
226229
if logEvery.ShouldLog() {
227230
log.Infof(ctx,
228-
"schema change waiting for concurrent schema changes on descriptor %d,"+
229-
" waited %v so far", descID, timeutil.Since(start),
231+
"schema change waiting for %v concurrent schema change job(s) %v on descriptor %d,"+
232+
" waited %v so far", len(blockingJobIDs), blockingJobIDs, descID, timeutil.Since(start),
230233
)
231234
}
232235
}

pkg/sql/schema_changer.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,14 +584,29 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
584584
// descriptor, it seems possible for a job to be resumed after the mutation
585585
// has already been removed. If there's a mutation provided, we should check
586586
// whether it actually exists on the table descriptor and exit the job if not.
587-
for i, mutation := range tableDesc.AllMutations() {
587+
allMutations := tableDesc.AllMutations()
588+
for i, mutation := range allMutations {
588589
if mutation.MutationID() == sc.mutationID {
589590
if i != 0 {
591+
blockingJobIDsAsSet := make(map[catpb.JobID]struct{})
592+
for j := 0; j < i; j++ {
593+
blockingJobID, err := mustGetJobIDWithMutationID(tableDesc, allMutations[j].MutationID())
594+
if err != nil {
595+
return err
596+
}
597+
blockingJobIDsAsSet[blockingJobID] = struct{}{}
598+
}
599+
blockingJobIDs := make([]catpb.JobID, 0, len(blockingJobIDsAsSet))
600+
for jobID := range blockingJobIDsAsSet {
601+
blockingJobIDs = append(blockingJobIDs, jobID)
602+
}
590603
log.Infof(ctx,
591-
"schema change on %q (v%d): another change is still in progress",
592-
desc.GetName(), desc.GetVersion(),
604+
"schema change on %q (v%d): another %v schema change job(s) %v is still in progress "+
605+
"and it is blocking this job from proceeding",
606+
desc.GetName(), desc.GetVersion(), len(blockingJobIDs), blockingJobIDs,
593607
)
594-
return errSchemaChangeNotFirstInLine
608+
return errors.Wrapf(errSchemaChangeNotFirstInLine, "schema change is "+
609+
"blocked by %v other schema change job(s) %v", len(blockingJobIDs), blockingJobIDs)
595610
}
596611
break
597612
}
@@ -600,6 +615,21 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
600615
return nil
601616
}
602617

618+
func mustGetJobIDWithMutationID(
619+
tableDesc catalog.TableDescriptor, mutationID descpb.MutationID,
620+
) (jobID catpb.JobID, err error) {
621+
for _, mutationJob := range tableDesc.GetMutationJobs() {
622+
if mutationJob.MutationID == mutationID {
623+
jobID = mutationJob.JobID
624+
}
625+
}
626+
if jobID == catpb.InvalidJobID {
627+
return 0, errors.AssertionFailedf("mutation job with mutation ID %v is not found in table %q",
628+
mutationID, tableDesc.GetName())
629+
}
630+
return jobID, nil
631+
}
632+
603633
func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) {
604634
// Retrieve the descriptor that is being changed.
605635
var desc catalog.Descriptor

pkg/sql/schema_changer_helpers_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@ package sql
1212

1313
import (
1414
"context"
15+
"regexp"
16+
"sort"
17+
"strings"
1518
"testing"
1619

1720
"github.com/cockroachdb/cockroach/pkg/jobs"
1821
"github.com/cockroachdb/cockroach/pkg/roachpb"
1922
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
2023
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
24+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2125
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
26+
"github.com/cockroachdb/errors"
2227
"github.com/stretchr/testify/require"
2328
)
2429

@@ -88,3 +93,89 @@ func TestCalculateSplitAtShards(t *testing.T) {
8893
})
8994
}
9095
}
96+
97+
// TestNotFirstInLine tests that if a schema change's mutation is not first in
98+
// line, the error message clearly state what the blocking schema change job ID
99+
// is.
100+
func TestNotFirstInLine(t *testing.T) {
101+
defer leaktest.AfterTest(t)()
102+
103+
// A helper to extract blocking schema change job IDs from the error message
104+
// which is of the form "schema change is blocked by x other schema change
105+
// job(s) [yyy zzz]".
106+
extractSortedBlockingJobIDsFromNotFirstInLineErr := func(errMsg string) []string {
107+
p := regexp.MustCompile(`\[(.*?)\]`)
108+
m := p.FindStringSubmatch(errMsg)
109+
require.NotNilf(t, m, "did not find any blocking job IDs")
110+
blockingJobIDs := strings.Fields(m[1])
111+
sort.Slice(blockingJobIDs, func(i, j int) bool {
112+
return blockingJobIDs[i] <= blockingJobIDs[j]
113+
})
114+
return blockingJobIDs
115+
}
116+
117+
ctx := context.Background()
118+
desc := descpb.TableDescriptor{
119+
Name: "t",
120+
ID: 104,
121+
Mutations: []descpb.DescriptorMutation{
122+
{
123+
Descriptor_: &descpb.DescriptorMutation_Index{},
124+
MutationID: 1,
125+
},
126+
{
127+
Descriptor_: &descpb.DescriptorMutation_Index{},
128+
MutationID: 1,
129+
},
130+
{
131+
Descriptor_: &descpb.DescriptorMutation_Column{},
132+
MutationID: 2,
133+
},
134+
{
135+
Descriptor_: &descpb.DescriptorMutation_Column{},
136+
MutationID: 3,
137+
},
138+
},
139+
MutationJobs: []descpb.TableDescriptor_MutationJob{
140+
{
141+
JobID: 11111,
142+
MutationID: 1,
143+
},
144+
{
145+
JobID: 22222,
146+
MutationID: 2,
147+
},
148+
{
149+
JobID: 33333,
150+
MutationID: 3,
151+
},
152+
},
153+
}
154+
mut := tabledesc.NewBuilder(&desc).BuildExistingMutableTable()
155+
{
156+
sc := SchemaChanger{
157+
descID: 104,
158+
mutationID: 1,
159+
}
160+
err := sc.notFirstInLine(ctx, mut)
161+
require.NoError(t, err)
162+
}
163+
{
164+
sc := SchemaChanger{
165+
descID: 104,
166+
mutationID: 2,
167+
}
168+
err := sc.notFirstInLine(ctx, mut)
169+
require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine))
170+
require.Equal(t, []string{"11111"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error()))
171+
}
172+
{
173+
sc := SchemaChanger{
174+
descID: 104,
175+
mutationID: 3,
176+
}
177+
err := sc.notFirstInLine(ctx, mut)
178+
require.True(t, errors.Is(err, errSchemaChangeNotFirstInLine))
179+
require.Equal(t, []string{"11111", "22222"}, extractSortedBlockingJobIDsFromNotFirstInLineErr(err.Error()))
180+
}
181+
}

pkg/sql/schema_changer_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"database/sql/driver"
1717
"fmt"
1818
"math/rand"
19+
"regexp"
1920
"strconv"
2021
"strings"
2122
"sync"
@@ -8389,3 +8390,41 @@ SELECT fraction_completed > 0
83898390
}
83908391
}
83918392
}
8393+
8394+
// TestLegacySchemaChangerWaitsForOtherSchemaChanges tests concurrent legacy schema changes
8395+
// wait properly for preceding ones if it's not first in line.
8396+
func TestLegacySchemaChangerWaitsForOtherSchemaChanges(t *testing.T) {
8397+
defer leaktest.AfterTest(t)()
8398+
defer log.Scope(t).Close(t)
8399+
ctx := context.Background()
8400+
params, _ := tests.CreateTestServerParams()
8401+
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
8402+
8403+
s, sqlDB, _ := serverutils.StartServer(t, params)
8404+
defer s.Stopper().Stop(ctx)
8405+
tdb := sqlutils.MakeSQLRunner(sqlDB)
8406+
8407+
tdb.Exec(t, `SET use_declarative_schema_changer = off`)
8408+
tdb.Exec(t, `CREATE TABLE t (i INT PRIMARY KEY);`)
8409+
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec';`)
8410+
8411+
pattern, err := regexp.Compile(`\d+`)
8412+
require.NoError(t, err)
8413+
_, err = sqlDB.Exec(`CREATE INDEX idx ON t (i);`)
8414+
jobID1 := pattern.FindString(err.Error())
8415+
require.NotEmpty(t, jobID1)
8416+
_, err = sqlDB.Exec(`ALTER TABLE t ADD COLUMN j INT DEFAULT 30;`)
8417+
jobID2 := pattern.FindString(err.Error())
8418+
require.NotEmpty(t, jobID2)
8419+
8420+
tdb.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = '';`)
8421+
tdb.Exec(t, `RESUME JOB $1`, jobID2)
8422+
tdb.Exec(t, `RESUME JOB $1`, jobID1)
8423+
testutils.SucceedsSoon(t, func() error {
8424+
res := tdb.QueryStr(t, `SELECT status FROM [SHOW JOBS] WHERE job_id in ($1, $2)`, jobID1, jobID2)
8425+
if len(res) == 2 && res[0][0] == "succeeded" && res[1][0] == "succeeded" {
8426+
return nil
8427+
}
8428+
return errors.New("")
8429+
})
8430+
}

0 commit comments

Comments
 (0)