Skip to content

Commit b07e9c2

Browse files
craig[bot]yuzefovich
andcommitted
Merge #148549
148549: sql: fix COPY with some concurrent schema changes r=yuzefovich a=yuzefovich Previously, if we had COPY running concurrently with some schema changes that require usage of Puts (via ForcePut on the index), like ALTER PRIMARY KEY, we could hit an internal error because the vectorized encoder didn't support it. This commit adds the support for it, but it also actually disables using the vectorized encoder for COPY when ForcePut option is observed on at least one writable index. This is the case since I observed many different corruption-like failures when stressing the extended test, which aren't present if we use the row-by-row insert. Existing `TestLargeCopy` has been extended so that sometimes it also performs ALTER PRIMARY KEY concurrently with the COPY as well as randomized `copy_from_atomic_enabled` value. Note that in some cases we're hitting "duplicate key" violation while there is a schema change, and it seems somewhat expected to me, so I didn't look deeper and simply made the test ignore such errors. Fixes: #147955. Release note (bug fix): CockroachDB could previously encounter "vector encoder doesn't support ForcePut yet" error when performing COPY command concurrently with some schema changes. The bug has been present since before 23.2 and is now fixed. Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents daae10b + 3ae33db commit b07e9c2

File tree

4 files changed

+71
-18
lines changed

4 files changed

+71
-18
lines changed

pkg/sql/colenc/encode.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,6 @@ func (b *BatchEncoder) PrepareBatch(ctx context.Context, p row.Putter, start, en
125125
}
126126
for _, ind := range b.rh.TableDesc.WritableNonPrimaryIndexes() {
127127
b.resetBuffers()
128-
// TODO(cucaroach): COPY doesn't need ForcePut support but the encoder
129-
// will need to support it eventually.
130-
if ind.ForcePut() {
131-
colexecerror.InternalError(errors.AssertionFailedf("vector encoder doesn't support ForcePut yet"))
132-
}
133128
if err := b.encodeSecondaryIndex(ctx, ind); err != nil {
134129
return err
135130
}
@@ -484,6 +479,13 @@ func (b *BatchEncoder) encodeSecondaryIndex(ctx context.Context, ind catalog.Ind
484479
return b.checkMemory()
485480
}
486481

482+
func (b *BatchEncoder) useCPutForSecondary(ind catalog.Index) bool {
483+
if ind.ForcePut() {
484+
return false
485+
}
486+
return ind.IsUnique() || b.useCPutsOnNonUniqueIndexes
487+
}
488+
487489
func (b *BatchEncoder) encodeSecondaryIndexNoFamilies(ind catalog.Index, kys []roachpb.Key) error {
488490
for row := 0; row < b.count; row++ {
489491
// Elided partial index keys will be empty.
@@ -510,7 +512,7 @@ func (b *BatchEncoder) encodeSecondaryIndexNoFamilies(ind catalog.Index, kys []r
510512
if err := b.writeColumnValues(kys, values, ind, cols); err != nil {
511513
return err
512514
}
513-
if ind.IsUnique() || b.useCPutsOnNonUniqueIndexes {
515+
if b.useCPutForSecondary(ind) {
514516
b.p.CPutBytesEmpty(kys, values)
515517
} else {
516518
b.p.PutBytes(kys, values)
@@ -576,13 +578,13 @@ func (b *BatchEncoder) encodeSecondaryIndexWithFamilies(
576578
// include encoded primary key columns. For other families,
577579
// use the tuple encoding for the value.
578580
if familyID == 0 {
579-
if ind.IsUnique() || b.useCPutsOnNonUniqueIndexes {
581+
if b.useCPutForSecondary(ind) {
580582
b.p.CPutBytesEmpty(kys, values)
581583
} else {
582584
b.p.PutBytes(kys, values)
583585
}
584586
} else {
585-
if ind.IsUnique() || b.useCPutsOnNonUniqueIndexes {
587+
if b.useCPutForSecondary(ind) {
586588
b.p.CPutTuplesEmpty(kys, values)
587589
} else {
588590
b.p.PutTuples(kys, values)

pkg/sql/colenc/inverted.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (b *BatchEncoder) encodeInvertedSecondaryIndexNoFamiliesOneRow(
9494
}
9595
var kvValue roachpb.Value
9696
kvValue.SetBytes(value)
97-
if ind.IsUnique() || b.useCPutsOnNonUniqueIndexes {
97+
if b.useCPutForSecondary(ind) {
9898
b.p.CPut(&key, &kvValue, nil /* expValue */)
9999
} else {
100100
b.p.Put(&key, &kvValue)

pkg/sql/copy/copy_test.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -704,8 +704,9 @@ func TestLargeCopy(t *testing.T) {
704704
// This test can cause timeouts.
705705
skip.UnderRace(t)
706706
ctx := context.Background()
707+
rng, _ := randutil.NewPseudoRand()
707708

708-
srv, _, kvdb := serverutils.StartServer(t, base.TestServerArgs{})
709+
srv, sqlDB, kvdb := serverutils.StartServer(t, base.TestServerArgs{})
709710
defer srv.Stopper().Stop(ctx)
710711
s := srv.ApplicationLayer()
711712

@@ -719,16 +720,53 @@ func TestLargeCopy(t *testing.T) {
719720

720721
desc := desctestutils.TestingGetPublicTableDescriptor(kvdb, s.Codec(), "defaultdb", "lineitem")
721722
require.NotNil(t, desc, "Failed to lookup descriptor")
723+
rows := rng.Intn(2000) + 1
724+
cr := &copyReader{rng: rng, cols: desc.PublicColumns(), rows: rows}
722725

723-
err = conn.Exec(ctx, "SET copy_from_atomic_enabled = false")
724-
require.NoError(t, err)
726+
if rng.Float64() < 0.5 {
727+
_, err = sqlDB.Exec("SET copy_from_atomic_enabled = false")
728+
require.NoError(t, err)
729+
}
725730

726-
rng := rand.New(rand.NewSource(0))
727-
rows := 100
728-
numrows, err := conn.GetDriverConn().CopyFrom(ctx,
729-
&copyReader{rng: rng, cols: desc.PublicColumns(), rows: rows},
730-
"COPY lineitem FROM STDIN WITH CSV;")
731-
require.NoError(t, err)
731+
// In 50% cases change the primary key concurrently with the COPY.
732+
doAlterPK := rng.Float64() < 0.5
733+
ignoreErr := func(err error) bool {
734+
if err == nil {
735+
return true
736+
}
737+
if !doAlterPK {
738+
return false
739+
}
740+
// We might hit a duplicate key error when changing the primary key
741+
// (which seems somewhat expected), so we'll ignore such an error.
742+
return strings.Contains(err.Error(), "duplicate key") ||
743+
// TODO(yuzefovich): occasionally we get this error, and it's not
744+
// clear why. Look into this.
745+
strings.Contains(err.Error(), "cannot disable pipelining on a running transaction")
746+
}
747+
alterPKCh := make(chan error)
748+
if doAlterPK {
749+
// We'll delay starting the ALTER to exercise different scenarios.
750+
sleepMillis := rng.Intn(2000)
751+
go func() {
752+
defer close(alterPKCh)
753+
time.Sleep(time.Duration(sleepMillis) * time.Millisecond)
754+
_, err := sqlDB.Exec("ALTER TABLE lineitem ALTER PRIMARY KEY USING COLUMNS (l_orderkey, l_linenumber, l_suppkey);")
755+
alterPKCh <- err
756+
}()
757+
// Delay starting the COPY too.
758+
time.Sleep(time.Duration(rng.Intn(2000)) * time.Millisecond)
759+
} else {
760+
close(alterPKCh)
761+
}
762+
numrows, copyErr := conn.GetDriverConn().CopyFrom(ctx, cr, "COPY lineitem FROM STDIN WITH CSV;")
763+
alterErr := <-alterPKCh
764+
if !ignoreErr(copyErr) {
765+
t.Fatal(copyErr)
766+
}
767+
if !ignoreErr(alterErr) {
768+
t.Fatal(alterErr)
769+
}
732770
require.Equal(t, int(numrows), rows)
733771
}
734772

pkg/sql/copy_from.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,19 @@ func (c *copyMachine) canSupportVectorized(table catalog.TableDescriptor) bool {
428428
if len(table.VectorIndexes()) > 0 {
429429
return false
430430
}
431+
forcePut := table.GetPrimaryIndex().ForcePut()
432+
secondaryIndexes := table.WritableNonPrimaryIndexes()
433+
for i := 0; !forcePut && i < len(secondaryIndexes); i++ {
434+
forcePut = secondaryIndexes[i].ForcePut()
435+
}
436+
if forcePut {
437+
// Even though the vector encoder supports ForcePut behavior, testing
438+
// COPY with a concurrent ALTER PRIMARY KEY has resulted in different
439+
// corruption scenarios. The non-vectorized COPY doesn't hit those, so
440+
// we choose to fall back.
441+
// TODO(#157198): investigate this.
442+
return false
443+
}
431444
// Vectorized COPY doesn't support foreign key checks, no reason it couldn't
432445
// but it doesn't work right now because we don't have the ability to
433446
// hold the results in a bufferNode. We wouldn't want to enable it

0 commit comments

Comments
 (0)