Skip to content

Commit 57a9a63

Browse files
craig[bot]asg0451fqaziarulajmani
committed
143991: changefeedccl: add roachtest and parallel test for enriched envelope r=andyyang890 a=asg0451 Add a roachtest and a parallelized test for enriched envelope feeds. Part of: #139660 Release note: None 144073: catalog/lease: fix race condition releasing old versions r=fqazi a=fqazi Previously, a race condition existed between the name cache and the logic used to clean up old versions of leased descriptors. This issue was introduced when atomic operations were used to manipulate the descriptor version states. Specifically, after ensuring the reference count is zero, it's necessary to acquire a lock before releasing the stored lease. This patch adds logic to acquire and release a mutex when cleaning up old versions. Fixes: #143815 Release note: None 144093: kvclient: enable savepoint rollbacks with buffered writes r=arulajmani a=arulajmani First commit from #144058 ---- This patch correctly handles savepoint rollbacks when buffered writes are enabled. Whenever a savepoint is rolled back, any writes that belong to the savepoint are removed from the buffer. Closes #139059 Release note: None Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Arul Ajmani <[email protected]>
4 parents 9fef4b2 + 409e144 + a636200 + 02d1f93 commit 57a9a63

File tree

7 files changed

+390
-22
lines changed

7 files changed

+390
-22
lines changed

pkg/ccl/changefeedccl/cdctest/validator.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,21 +1017,26 @@ func ParseJSONValueTimestamps(v []byte) (updated, resolved hlc.Timestamp, err er
10171017
var valueRaw struct {
10181018
Resolved string `json:"resolved"`
10191019
Updated string `json:"updated"`
1020+
Source struct {
1021+
TsHLC string `json:"ts_hlc"`
1022+
} `json:"source"`
10201023
}
10211024
if err := gojson.Unmarshal(v, &valueRaw); err != nil {
10221025
return hlc.Timestamp{}, hlc.Timestamp{}, errors.Wrapf(err, "parsing [%s] as json", v)
10231026
}
1027+
10241028
if valueRaw.Updated != `` {
1025-
var err error
1026-
updated, err = hlc.ParseHLC(valueRaw.Updated)
1027-
if err != nil {
1029+
if updated, err = hlc.ParseHLC(valueRaw.Updated); err != nil {
10281030
return hlc.Timestamp{}, hlc.Timestamp{}, err
10291031
}
10301032
}
10311033
if valueRaw.Resolved != `` {
1032-
var err error
1033-
resolved, err = hlc.ParseHLC(valueRaw.Resolved)
1034-
if err != nil {
1034+
if resolved, err = hlc.ParseHLC(valueRaw.Resolved); err != nil {
1035+
return hlc.Timestamp{}, hlc.Timestamp{}, err
1036+
}
1037+
}
1038+
if valueRaw.Source.TsHLC != `` {
1039+
if updated, err = hlc.ParseHLC(valueRaw.Source.TsHLC); err != nil {
10351040
return hlc.Timestamp{}, hlc.Timestamp{}, err
10361041
}
10371042
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4193,6 +4193,99 @@ func TestChangefeedEnriched(t *testing.T) {
41934193
}
41944194
}
41954195

4196+
// TestChangefeedsParallelEnriched tests that multiple changefeeds can run in
4197+
// parallel with the enriched envelope. It is most useful under race, to ensure
4198+
// that there is no accidental data races in the encoders and source providers.
4199+
func TestChangefeedsParallelEnriched(t *testing.T) {
4200+
defer leaktest.AfterTest(t)()
4201+
defer log.Scope(t).Close(t)
4202+
ctx := context.Background()
4203+
4204+
const numFeeds = 10
4205+
const maxIterations = 1_000_000_000
4206+
const maxRows = 100
4207+
4208+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4209+
db := sqlutils.MakeSQLRunner(s.DB)
4210+
db.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
4211+
4212+
ctx, cancel := context.WithCancel(ctx)
4213+
4214+
var wg sync.WaitGroup
4215+
wg.Add(1)
4216+
go func() {
4217+
defer wg.Done()
4218+
db := sqlutils.MakeSQLRunner(s.Server.SQLConn(t))
4219+
var i int
4220+
for i = 0; i < maxIterations && ctx.Err() == nil; i++ {
4221+
db.Exec(t, `UPSERT INTO d.foo VALUES ($1, $2)`, i%maxRows, fmt.Sprintf("hello %d", i))
4222+
}
4223+
}()
4224+
4225+
opts := `envelope='enriched'`
4226+
4227+
_, isKafka := f.(*kafkaFeedFactory)
4228+
useAvro := isKafka && rand.Intn(2) == 0
4229+
if useAvro {
4230+
t.Logf("using avro")
4231+
opts += `, format='avro'`
4232+
}
4233+
var feeds []cdctest.TestFeed
4234+
for range numFeeds {
4235+
feed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH %s`, opts))
4236+
feeds = append(feeds, feed)
4237+
}
4238+
4239+
// consume from the feeds
4240+
for _, feed := range feeds {
4241+
feed := feed
4242+
msgCount := 0
4243+
4244+
wg.Add(1)
4245+
go func() {
4246+
defer wg.Done()
4247+
for ctx.Err() == nil {
4248+
_, err := feed.Next()
4249+
if err != nil {
4250+
if errors.Is(err, context.Canceled) {
4251+
t.Errorf("error reading from feed: %v", err)
4252+
}
4253+
break
4254+
}
4255+
msgCount++
4256+
}
4257+
assert.GreaterOrEqual(t, msgCount, 0)
4258+
}()
4259+
}
4260+
4261+
// let the feeds run for a few seconds
4262+
select {
4263+
case <-time.After(5 * time.Second):
4264+
case <-ctx.Done():
4265+
t.Fatalf("%v", ctx.Err())
4266+
}
4267+
4268+
cancel()
4269+
4270+
for _, feed := range feeds {
4271+
closeFeed(t, feed)
4272+
}
4273+
4274+
doneWaiting := make(chan struct{})
4275+
go func() {
4276+
defer close(doneWaiting)
4277+
wg.Wait()
4278+
}()
4279+
select {
4280+
case <-doneWaiting:
4281+
case <-time.After(5 * time.Second):
4282+
t.Fatalf("timed out waiting for goroutines to finish")
4283+
}
4284+
}
4285+
// Sinkless testfeeds have some weird shutdown behaviours, so exclude them for now.
4286+
cdcTest(t, testFn, feedTestRestrictSinks("kafka", "pubsub", "webhook"))
4287+
}
4288+
41964289
func TestChangefeedEnrichedAvro(t *testing.T) {
41974290
defer leaktest.AfterTest(t)()
41984291
defer log.Scope(t).Close(t)

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ func makeDefaultFeatureFlags() cdcFeatureFlags {
517517
type feedArgs struct {
518518
sinkType sinkType
519519
targets []string
520+
envelope string
520521
opts map[string]string
521522
assumeRole string
522523
tolerateErrors bool
@@ -547,15 +548,15 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob {
547548

548549
targetsStr := strings.Join(args.targets, ", ")
549550

551+
if args.envelope == "" {
552+
args.envelope = "wrapped"
553+
}
554+
550555
feedOptions := make(map[string]string)
551556
feedOptions["min_checkpoint_frequency"] = "'10s'"
557+
feedOptions["envelope"] = args.envelope
552558
if args.sinkType == cloudStorageSink || args.sinkType == webhookSink {
553-
// Webhook and cloudstorage don't have a concept of keys and therefore
554-
// require envelope=wrapped
555-
feedOptions["envelope"] = "wrapped"
556-
557559
feedOptions["resolved"] = "'10s'"
558-
559560
} else {
560561
feedOptions["resolved"] = ""
561562
}
@@ -2311,6 +2312,40 @@ func registerCDC(r registry.Registry) {
23112312
Timeout: 1 * time.Hour,
23122313
Run: runCDCMultipleSchemaChanges,
23132314
})
2315+
r.Add(registry.TestSpec{
2316+
Name: "cdc/tpcc-100/10min/sink=kafka/envelope=enriched",
2317+
Owner: registry.OwnerCDC,
2318+
Benchmark: true,
2319+
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(16)),
2320+
Leases: registry.MetamorphicLeases,
2321+
CompatibleClouds: registry.AllClouds,
2322+
Suites: registry.Suites(registry.Nightly),
2323+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
2324+
ct := newCDCTester(ctx, t, c)
2325+
defer ct.Close()
2326+
2327+
ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"})
2328+
2329+
feed := ct.newChangefeed(feedArgs{
2330+
sinkType: kafkaSink,
2331+
envelope: "enriched",
2332+
targets: allTpccTargets,
2333+
kafkaArgs: kafkaFeedArgs{
2334+
validateOrder: true,
2335+
},
2336+
opts: map[string]string{
2337+
"initial_scan": "'no'",
2338+
"updated": "",
2339+
"enriched_properties": "source",
2340+
},
2341+
})
2342+
ct.runFeedLatencyVerifier(feed, latencyTargets{
2343+
initialScanLatency: 3 * time.Minute,
2344+
steadyLatency: 10 * time.Minute,
2345+
})
2346+
ct.waitForWorkload()
2347+
},
2348+
})
23142349
}
23152350

23162351
const (

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"encoding/binary"
1111
"slices"
12+
"sort"
1213
"unsafe"
1314

1415
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -423,7 +424,36 @@ func (twb *txnWriteBuffer) epochBumpedLocked() {}
423424
func (twb *txnWriteBuffer) createSavepointLocked(context.Context, *savepoint) {}
424425

425426
// rollbackToSavepointLocked is part of the txnInterceptor interface.
426-
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {}
427+
func (twb *txnWriteBuffer) rollbackToSavepointLocked(ctx context.Context, s savepoint) {
428+
toDelete := make([]*bufferedWrite, 0)
429+
it := twb.buffer.MakeIter()
430+
for it.First(); it.Valid(); it.Next() {
431+
bufferedVals := it.Cur().vals
432+
// NB: the savepoint is being rolled back to s.seqNum (inclusive). So,
433+
// idx is the index of the first value that is considered rolled back.
434+
idx := sort.Search(len(bufferedVals), func(i int) bool {
435+
return bufferedVals[i].seq >= s.seqNum
436+
})
437+
if idx == len(bufferedVals) {
438+
// No writes are being rolled back.
439+
continue
440+
}
441+
// Update size bookkeeping for the values we're rolling back.
442+
for i := idx; i < len(bufferedVals); i++ {
443+
twb.bufferSize -= bufferedVals[i].size()
444+
}
445+
// Rollback writes by truncating the buffered values.
446+
it.Cur().vals = bufferedVals[:idx]
447+
if len(it.Cur().vals) == 0 {
448+
// All writes have been rolled back; we should remove this key from
449+
// the buffer entirely.
450+
toDelete = append(toDelete, it.Cur())
451+
}
452+
}
453+
for _, bw := range toDelete {
454+
twb.removeFromBuffer(bw)
455+
}
456+
}
427457

428458
// closeLocked implements the txnInterceptor interface.
429459
func (twb *txnWriteBuffer) closeLocked() {}
@@ -1057,6 +1087,12 @@ func (twb *txnWriteBuffer) addToBuffer(key roachpb.Key, val roachpb.Value, seq e
10571087
}
10581088
}
10591089

1090+
// removeFromBuffer removes all buffered writes on a given key from the buffer.
1091+
func (twb *txnWriteBuffer) removeFromBuffer(bw *bufferedWrite) {
1092+
twb.buffer.Delete(bw)
1093+
twb.bufferSize -= bw.size()
1094+
}
1095+
10601096
// flushBufferAndSendBatch flushes all buffered writes when sending the supplied
10611097
// batch request to the KV layer. This is done by pre-pending the buffered
10621098
// writes to the requests in the batch.
@@ -1092,11 +1128,11 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
10921128

10931129
reqs := make([]kvpb.RequestUnion, 0, numBuffered+len(ba.Requests))
10941130

1095-
// Next, remove the buffered writes from the buffer and collect them into requests.
1131+
// Next, remove the buffered writes from the buffer and collect them into
1132+
// requests.
10961133
for _, bw := range toFlushBufferedWrites {
10971134
reqs = append(reqs, bw.toRequest())
1098-
twb.buffer.Delete(&bw)
1099-
twb.bufferSize -= bw.size()
1135+
twb.removeFromBuffer(&bw)
11001136
}
11011137

11021138
// Layers below us expect that writes inside a batch are in sequence number

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,3 +1614,101 @@ func TestTxnWriteBufferDeleteRange(t *testing.T) {
16141614
require.Len(t, br.Responses, 1)
16151615
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
16161616
}
1617+
1618+
// TestTxnWriteBufferRollbackToSavepoint tests the savepoint rollback logic.
1619+
func TestTxnWriteBufferRollbackToSavepoint(t *testing.T) {
1620+
defer leaktest.AfterTest(t)()
1621+
defer log.Scope(t).Close(t)
1622+
ctx := context.Background()
1623+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1624+
1625+
txn := makeTxnProto()
1626+
txn.Sequence = 10
1627+
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
1628+
valA, valA2, valB := "valA", "valA2", "valB"
1629+
1630+
ba := &kvpb.BatchRequest{}
1631+
ba.Header = kvpb.Header{Txn: &txn}
1632+
putA := putArgs(keyA, valA, txn.Sequence)
1633+
txn.Sequence++
1634+
delC := delArgs(keyC, txn.Sequence)
1635+
ba.Add(putA)
1636+
ba.Add(delC)
1637+
1638+
numCalled := mockSender.NumCalled()
1639+
br, pErr := twb.SendLocked(ctx, ba)
1640+
require.Nil(t, pErr)
1641+
require.NotNil(t, br)
1642+
// All the requests should be buffered and not make it past the
1643+
// txnWriteBuffer. The response returned should be indistinguishable.
1644+
require.Equal(t, numCalled, mockSender.NumCalled())
1645+
require.Len(t, br.Responses, 2)
1646+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1647+
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[1].GetInner())
1648+
// Verify the
1649+
expBufferedWrites := []bufferedWrite{
1650+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1651+
makeBufferedWrite(keyC, makeBufferedValue("", 11)),
1652+
}
1653+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1654+
1655+
// Create a savepoint. This is inclusive of the buffered Delete on keyC.
1656+
savepoint := &savepoint{seqNum: txn.Sequence}
1657+
twb.createSavepointLocked(ctx, savepoint)
1658+
1659+
// Add some new writes. A second write to keyA and a new one to keyB.
1660+
ba = &kvpb.BatchRequest{}
1661+
txn.Sequence++
1662+
putA2 := putArgs(keyA, valA2, txn.Sequence)
1663+
ba.Add(putA2)
1664+
txn.Sequence++
1665+
putB := putArgs(keyB, valB, txn.Sequence)
1666+
ba.Add(putB)
1667+
1668+
// All these writes should be buffered as well, with no KV requests sent yet.
1669+
numCalled = mockSender.NumCalled()
1670+
br, pErr = twb.SendLocked(ctx, ba)
1671+
require.Nil(t, pErr)
1672+
require.NotNil(t, br)
1673+
1674+
require.Equal(t, numCalled, mockSender.NumCalled())
1675+
require.Len(t, br.Responses, 2)
1676+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1677+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[1].GetInner())
1678+
// Verify the state of the write buffer.
1679+
expBufferedWrites = []bufferedWrite{
1680+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10), makeBufferedValue("valA2", 12)),
1681+
makeBufferedWrite(keyB, makeBufferedValue("valB", 13)),
1682+
makeBufferedWrite(keyC, makeBufferedValue("", 11)),
1683+
}
1684+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1685+
1686+
// Now, Rollback to the savepoint. This should leave just one write in the
1687+
// buffer, that on keyA at seqnum 10.
1688+
twb.rollbackToSavepointLocked(ctx, *savepoint)
1689+
expBufferedWrites = []bufferedWrite{
1690+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1691+
}
1692+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1693+
1694+
// Commit the transaction.
1695+
ba = &kvpb.BatchRequest{}
1696+
ba.Header = kvpb.Header{Txn: &txn}
1697+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1698+
1699+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1700+
require.Len(t, ba.Requests, 2)
1701+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1702+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner())
1703+
1704+
br = ba.CreateReply()
1705+
br.Txn = ba.Txn
1706+
return br, nil
1707+
})
1708+
1709+
br, pErr = twb.SendLocked(ctx, ba)
1710+
require.Nil(t, pErr)
1711+
require.NotNil(t, br)
1712+
require.Len(t, br.Responses, 1)
1713+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1714+
}

pkg/sql/catalog/lease/descriptor_state.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,15 @@ func (t *descriptorState) removeInactiveVersions() []*storedLease {
203203
for _, desc := range append([]*descriptorVersionState(nil), t.mu.active.data...) {
204204
if desc.refcount.Load() == 0 {
205205
t.mu.active.remove(desc)
206-
if l := desc.mu.lease; l != nil {
207-
desc.mu.lease = nil
208-
leases = append(leases, l)
209-
}
206+
func() {
207+
desc.mu.Lock()
208+
defer desc.mu.Unlock()
209+
if l := desc.mu.lease; l != nil {
210+
desc.mu.lease = nil
211+
leases = append(leases, l)
212+
213+
}
214+
}()
210215
}
211216
}
212217
return leases

0 commit comments

Comments
 (0)