Skip to content

Commit d83d1d5

Browse files
craig[bot]fqazi
andcommitted
Merge #156867
156867: sql/schemachanger: flush batches while making metadata changes r=fqazi a=fqazi Previously, the declarative schema changer only flushed batches in a transaction at the end of a stage. This could lead to command sizes that are too large prevent large truncates from completing. This patch, flushes the batches once they are above a certain size. Fixes: #156682 Release note (bug fix): Addressed a bug that prevents large TRUNCATE operations from completing due to `command is too large` errors. Co-authored-by: Faizan Qazi <[email protected]>
2 parents bd8f45a + 2d04977 commit d83d1d5

File tree

7 files changed

+140
-21
lines changed

7 files changed

+140
-21
lines changed

pkg/sql/drop_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,6 +1552,47 @@ func TestDropLargeDatabaseWithDeclarativeSchemaChanger(t *testing.T) {
15521552
true)
15531553
}
15541554

1555+
// TestTruncateLarge truncates a large number of tables in a single transaction.
1556+
func TestTruncateLarge(t *testing.T) {
1557+
defer leaktest.AfterTest(t)()
1558+
defer log.Scope(t).Close(t)
1559+
skip.UnderDuress(t, "truncating a large number of tables")
1560+
1561+
testutils.RunTrueAndFalse(t, "batch limit set", func(t *testing.T, batchLimitSet bool) {
1562+
ctx := context.Background()
1563+
s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{})
1564+
defer s.Stopper().Stop(ctx)
1565+
sqlDB := sqlutils.MakeSQLRunner(conn)
1566+
createCommand := strings.Builder{}
1567+
truncateCommand := strings.Builder{}
1568+
sqlDB.Exec(t, "SET CLUSTER SETTING kv.raft.command.max_size='5m'")
1569+
if batchLimitSet {
1570+
sqlDB.Exec(t, "SET CLUSTER SETTING sql.schema_changer.batch_flush_threshold_size='2m'")
1571+
}
1572+
// Generate the truncate and create table commands.
1573+
const numTables = 500
1574+
truncateCommand.WriteString("TRUNCATE TABLE ")
1575+
for i := range numTables {
1576+
createCommand.WriteString(fmt.Sprintf("CREATE TABLE t%d (a INT PRIMARY KEY, j INT, k INT, INDEX (j), INDEX (k), UNIQUE (j, k));\n", i))
1577+
truncateCommand.WriteString(fmt.Sprintf("t%d", i))
1578+
if i != numTables-1 {
1579+
truncateCommand.WriteString(", ")
1580+
}
1581+
}
1582+
// Execute the create commands first.
1583+
sqlDB.Exec(t, createCommand.String())
1584+
1585+
// The default limit is larger than our reduced raft command size, so if the
1586+
// limit is not modified, the truncate command will fail.
1587+
if batchLimitSet {
1588+
sqlDB.Exec(t, truncateCommand.String())
1589+
} else {
1590+
sqlDB.ExpectErr(t, "command is too large", truncateCommand.String())
1591+
}
1592+
})
1593+
1594+
}
1595+
15551596
func BenchmarkDropLargeDatabaseWithGenerateTestObjects(b *testing.B) {
15561597
defer leaktest.AfterTest(b)()
15571598
defer log.Scope(b).Close(b)

pkg/sql/schemachanger/scdeps/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
"//pkg/roachpb",
2020
"//pkg/security/username",
2121
"//pkg/server/telemetry",
22+
"//pkg/settings",
2223
"//pkg/settings/cluster",
2324
"//pkg/sql/catalog",
2425
"//pkg/sql/catalog/catalogkeys",

pkg/sql/schemachanger/scdeps/exec_deps.go

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
1919
"github.com/cockroachdb/cockroach/pkg/security/username"
2020
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
21+
"github.com/cockroachdb/cockroach/pkg/settings"
2122
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2324
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
@@ -35,6 +36,17 @@ import (
3536
"github.com/cockroachdb/errors"
3637
)
3738

39+
// batchFlushThresholdSize is the size of the metadata batch that,
40+
// when exceeded, causes the schema changer to flush the batch to the KV store.
41+
var batchFlushThresholdSize = settings.RegisterByteSizeSetting(
42+
settings.ApplicationLevel,
43+
"sql.schema_changer.batch_flush_threshold_size",
44+
"maximum size in bytes of the schema changer's metadata batch before it's flushed to the KV store. "+
45+
"This setting should be smaller or equal to kv.raft.command.max_size",
46+
32*1024*1024,
47+
settings.IntInRange(1024*1024, 512*1024*1024),
48+
)
49+
3850
// JobRegistry implements the methods the schema changer needs from the
3951
// job registry. Outside of tests this should always be backed by *job.Registry.
4052
type JobRegistry interface {
@@ -206,22 +218,38 @@ func (d *txnDeps) MustReadMutableDescriptor(
206218
func (d *txnDeps) CreateOrUpdateDescriptor(
207219
ctx context.Context, desc catalog.MutableDescriptor,
208220
) error {
209-
return d.descsCollection.WriteDescToBatch(ctx, d.kvTrace, desc, d.getOrCreateBatch())
221+
b, err := d.getOrCreateBatch(ctx)
222+
if err != nil {
223+
return err
224+
}
225+
return d.descsCollection.WriteDescToBatch(ctx, d.kvTrace, desc, b)
210226
}
211227

212228
// DeleteName implements the scexec.Catalog interface.
213229
func (d *txnDeps) DeleteName(ctx context.Context, nameInfo descpb.NameInfo, id descpb.ID) error {
214-
return d.descsCollection.DeleteNamespaceEntryToBatch(ctx, d.kvTrace, &nameInfo, d.getOrCreateBatch())
230+
b, err := d.getOrCreateBatch(ctx)
231+
if err != nil {
232+
return err
233+
}
234+
return d.descsCollection.DeleteNamespaceEntryToBatch(ctx, d.kvTrace, &nameInfo, b)
215235
}
216236

217237
// AddName implements the scexec.Catalog interface.
218238
func (d *txnDeps) AddName(ctx context.Context, nameInfo descpb.NameInfo, id descpb.ID) error {
219-
return d.descsCollection.InsertNamespaceEntryToBatch(ctx, d.kvTrace, &nameEntry{nameInfo, id}, d.getOrCreateBatch())
239+
b, err := d.getOrCreateBatch(ctx)
240+
if err != nil {
241+
return err
242+
}
243+
return d.descsCollection.InsertNamespaceEntryToBatch(ctx, d.kvTrace, &nameEntry{nameInfo, id}, b)
220244
}
221245

222246
// DeleteDescriptor implements the scexec.Catalog interface.
223247
func (d *txnDeps) DeleteDescriptor(ctx context.Context, id descpb.ID) error {
224-
return d.descsCollection.DeleteDescToBatch(ctx, d.kvTrace, id, d.getOrCreateBatch())
248+
b, err := d.getOrCreateBatch(ctx)
249+
if err != nil {
250+
return err
251+
}
252+
return d.descsCollection.DeleteDescToBatch(ctx, d.kvTrace, id, b)
225253
}
226254

227255
// GetZoneConfig implements the scexec.Catalog interface.
@@ -237,7 +265,11 @@ func (d *txnDeps) GetZoneConfig(ctx context.Context, id descpb.ID) (catalog.Zone
237265
func (d *txnDeps) WriteZoneConfigToBatch(
238266
ctx context.Context, id descpb.ID, zc catalog.ZoneConfig,
239267
) error {
240-
err := d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, d.getOrCreateBatch(), id, zc)
268+
b, err := d.getOrCreateBatch(ctx)
269+
if err != nil {
270+
return err
271+
}
272+
err = d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, b, id, zc)
241273
if err != nil {
242274
return err
243275
}
@@ -260,7 +292,11 @@ func (d *txnDeps) UpdateZoneConfig(ctx context.Context, id descpb.ID, zc *zonepb
260292
rawBytes = oldZc.GetRawBytesInStorage()
261293
}
262294
newZc = zone.NewZoneConfigWithRawBytes(zc, rawBytes)
263-
return d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, d.getOrCreateBatch(), id, newZc)
295+
b, err := d.getOrCreateBatch(ctx)
296+
if err != nil {
297+
return err
298+
}
299+
return d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, b, id, newZc)
264300
}
265301

266302
// UpdateSubzoneConfig implements the scexec.Catalog interface. Note that this
@@ -310,7 +346,11 @@ func (d *txnDeps) UpdateSubzoneConfig(
310346

311347
// DeleteZoneConfig implements the scexec.Catalog interface.
312348
func (d *txnDeps) DeleteZoneConfig(ctx context.Context, id descpb.ID) error {
313-
return d.descsCollection.DeleteZoneConfigInBatch(ctx, d.kvTrace, d.getOrCreateBatch(), id)
349+
b, err := d.getOrCreateBatch(ctx)
350+
if err != nil {
351+
return err
352+
}
353+
return d.descsCollection.DeleteZoneConfigInBatch(ctx, d.kvTrace, b, id)
314354
}
315355

316356
// DeleteSubzoneConfig implements the scexec.Catalog interface.
@@ -344,7 +384,11 @@ func (d *txnDeps) DeleteSubzoneConfig(
344384
zc.DeleteSubzoneSpans(subzoneSpans)
345385

346386
newZc = zone.NewZoneConfigWithRawBytes(zc, rawBytes)
347-
return d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, d.getOrCreateBatch(),
387+
b, err := d.getOrCreateBatch(ctx)
388+
if err != nil {
389+
return err
390+
}
391+
return d.descsCollection.WriteZoneConfigToBatch(ctx, d.kvTrace, b,
348392
tableID, newZc)
349393
}
350394

@@ -369,10 +413,14 @@ func (d *txnDeps) Run(ctx context.Context) error {
369413
}
370414

371415
// InitializeSequence implements the scexec.Caatalog interface.
372-
func (d *txnDeps) InitializeSequence(id descpb.ID, startVal int64) {
373-
batch := d.getOrCreateBatch()
416+
func (d *txnDeps) InitializeSequence(ctx context.Context, id descpb.ID, startVal int64) error {
417+
batch, err := d.getOrCreateBatch(ctx)
418+
if err != nil {
419+
return err
420+
}
374421
sequenceKey := d.codec.SequenceKey(uint32(id))
375422
batch.Inc(sequenceKey, startVal)
423+
return nil
376424
}
377425

378426
// CheckMaxSchemaObjects implements the scexec.Catalog interface.
@@ -394,21 +442,45 @@ func (d *txnDeps) Reset(ctx context.Context) error {
394442
return nil
395443
}
396444

397-
func (d *txnDeps) getOrCreateBatch() *kv.Batch {
445+
// maybeFlushBatch flushes the current batch if it exceeds the maximum size.
446+
func (d *txnDeps) maybeFlushBatch(ctx context.Context) error {
447+
if int64(d.batch.ApproximateMutationBytes()) > batchFlushThresholdSize.Get(&d.settings.SV) {
448+
if err := d.Run(ctx); err != nil {
449+
return err
450+
}
451+
d.batch = d.txn.KV().NewBatch()
452+
}
453+
return nil
454+
}
455+
456+
func (d *txnDeps) getOrCreateBatch(ctx context.Context) (*kv.Batch, error) {
398457
if d.batch == nil {
399458
d.batch = d.txn.KV().NewBatch()
459+
} else {
460+
// Otherwise, flush the batch if its too big.
461+
if err := d.maybeFlushBatch(ctx); err != nil {
462+
return nil, err
463+
}
400464
}
401-
return d.batch
465+
return d.batch, nil
402466
}
403467

404468
// UpdateComment implements the scexec.Catalog interface.
405469
func (d *txnDeps) UpdateComment(ctx context.Context, key catalogkeys.CommentKey, cmt string) error {
406-
return d.descsCollection.WriteCommentToBatch(ctx, d.kvTrace, d.getOrCreateBatch(), key, cmt)
470+
b, err := d.getOrCreateBatch(ctx)
471+
if err != nil {
472+
return err
473+
}
474+
return d.descsCollection.WriteCommentToBatch(ctx, d.kvTrace, b, key, cmt)
407475
}
408476

409477
// DeleteComment implements the scexec.Catalog interface.
410478
func (d *txnDeps) DeleteComment(ctx context.Context, key catalogkeys.CommentKey) error {
411-
return d.descsCollection.DeleteCommentInBatch(ctx, d.kvTrace, d.getOrCreateBatch(), key)
479+
b, err := d.getOrCreateBatch(ctx)
480+
if err != nil {
481+
return err
482+
}
483+
return d.descsCollection.DeleteCommentInBatch(ctx, d.kvTrace, b, key)
412484
}
413485

414486
var _ scexec.TransactionalJobRegistry = (*txnDeps)(nil)

pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1543,8 +1543,9 @@ func getNameEntryDescriptorType(parentID, parentSchemaID descpb.ID) string {
15431543
}
15441544

15451545
// InitializeSequence is part of the scexec.Catalog interface.
1546-
func (s *TestState) InitializeSequence(id descpb.ID, startVal int64) {
1546+
func (s *TestState) InitializeSequence(ctx context.Context, id descpb.ID, startVal int64) error {
15471547
s.LogSideEffectf("initializing sequence %d with starting value of %d", id, startVal)
1548+
return nil
15481549
}
15491550

15501551
// CheckMaxSchemaObjects is part of the scexec.Catalog interface.

pkg/sql/schemachanger/scexec/dependencies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type Catalog interface {
118118
Reset(ctx context.Context) error
119119

120120
// InitializeSequence initializes the initial value for a sequence.
121-
InitializeSequence(id descpb.ID, startVal int64)
121+
InitializeSequence(ctx context.Context, id descpb.ID, startVal int64) error
122122

123123
// CheckMaxSchemaObjects checks if the number of schema objects in the
124124
// cluster plus the new objects being created would exceed the configured

pkg/sql/schemachanger/scexec/exec_immediate_mutation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,9 @@ func (s *immediateState) exec(ctx context.Context, c Catalog) error {
272272
}
273273
}
274274
for _, s := range s.sequencesToInit {
275-
c.InitializeSequence(s.id, s.startVal)
275+
if err := c.InitializeSequence(ctx, s.id, s.startVal); err != nil {
276+
return err
277+
}
276278
}
277279
for tempIdxId, tempIdxToRegister := range s.temporarySchemasToRegister {
278280
c.InsertTemporarySchema(tempIdxToRegister.schemaName, tempIdxToRegister.parentID, tempIdxId)

pkg/sql/schemachanger/scexec/mocks_generated_test.go

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)