Skip to content

Commit 409e144

Browse files
committed
changefeedccl: add roachtest and parallel test for enriched envelope
Add a roachtest and a parallelized test for enriched envelope feeds. Part of: #139660 Release note: None
1 parent 78e2a2e commit 409e144

File tree

3 files changed

+144
-11
lines changed

3 files changed

+144
-11
lines changed

pkg/ccl/changefeedccl/cdctest/validator.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,21 +1017,26 @@ func ParseJSONValueTimestamps(v []byte) (updated, resolved hlc.Timestamp, err er
10171017
var valueRaw struct {
10181018
Resolved string `json:"resolved"`
10191019
Updated string `json:"updated"`
1020+
Source struct {
1021+
TsHLC string `json:"ts_hlc"`
1022+
} `json:"source"`
10201023
}
10211024
if err := gojson.Unmarshal(v, &valueRaw); err != nil {
10221025
return hlc.Timestamp{}, hlc.Timestamp{}, errors.Wrapf(err, "parsing [%s] as json", v)
10231026
}
1027+
10241028
if valueRaw.Updated != `` {
1025-
var err error
1026-
updated, err = hlc.ParseHLC(valueRaw.Updated)
1027-
if err != nil {
1029+
if updated, err = hlc.ParseHLC(valueRaw.Updated); err != nil {
10281030
return hlc.Timestamp{}, hlc.Timestamp{}, err
10291031
}
10301032
}
10311033
if valueRaw.Resolved != `` {
1032-
var err error
1033-
resolved, err = hlc.ParseHLC(valueRaw.Resolved)
1034-
if err != nil {
1034+
if resolved, err = hlc.ParseHLC(valueRaw.Resolved); err != nil {
1035+
return hlc.Timestamp{}, hlc.Timestamp{}, err
1036+
}
1037+
}
1038+
if valueRaw.Source.TsHLC != `` {
1039+
if updated, err = hlc.ParseHLC(valueRaw.Source.TsHLC); err != nil {
10351040
return hlc.Timestamp{}, hlc.Timestamp{}, err
10361041
}
10371042
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4193,6 +4193,99 @@ func TestChangefeedEnriched(t *testing.T) {
41934193
}
41944194
}
41954195

4196+
// TestChangefeedsParallelEnriched tests that multiple changefeeds can run in
4197+
// parallel with the enriched envelope. It is most useful under race, to ensure
4198+
// that there is no accidental data races in the encoders and source providers.
4199+
func TestChangefeedsParallelEnriched(t *testing.T) {
4200+
defer leaktest.AfterTest(t)()
4201+
defer log.Scope(t).Close(t)
4202+
ctx := context.Background()
4203+
4204+
const numFeeds = 10
4205+
const maxIterations = 1_000_000_000
4206+
const maxRows = 100
4207+
4208+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
4209+
db := sqlutils.MakeSQLRunner(s.DB)
4210+
db.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
4211+
4212+
ctx, cancel := context.WithCancel(ctx)
4213+
4214+
var wg sync.WaitGroup
4215+
wg.Add(1)
4216+
go func() {
4217+
defer wg.Done()
4218+
db := sqlutils.MakeSQLRunner(s.Server.SQLConn(t))
4219+
var i int
4220+
for i = 0; i < maxIterations && ctx.Err() == nil; i++ {
4221+
db.Exec(t, `UPSERT INTO d.foo VALUES ($1, $2)`, i%maxRows, fmt.Sprintf("hello %d", i))
4222+
}
4223+
}()
4224+
4225+
opts := `envelope='enriched'`
4226+
4227+
_, isKafka := f.(*kafkaFeedFactory)
4228+
useAvro := isKafka && rand.Intn(2) == 0
4229+
if useAvro {
4230+
t.Logf("using avro")
4231+
opts += `, format='avro'`
4232+
}
4233+
var feeds []cdctest.TestFeed
4234+
for range numFeeds {
4235+
feed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH %s`, opts))
4236+
feeds = append(feeds, feed)
4237+
}
4238+
4239+
// consume from the feeds
4240+
for _, feed := range feeds {
4241+
feed := feed
4242+
msgCount := 0
4243+
4244+
wg.Add(1)
4245+
go func() {
4246+
defer wg.Done()
4247+
for ctx.Err() == nil {
4248+
_, err := feed.Next()
4249+
if err != nil {
4250+
if errors.Is(err, context.Canceled) {
4251+
t.Errorf("error reading from feed: %v", err)
4252+
}
4253+
break
4254+
}
4255+
msgCount++
4256+
}
4257+
assert.GreaterOrEqual(t, msgCount, 0)
4258+
}()
4259+
}
4260+
4261+
// let the feeds run for a few seconds
4262+
select {
4263+
case <-time.After(5 * time.Second):
4264+
case <-ctx.Done():
4265+
t.Fatalf("%v", ctx.Err())
4266+
}
4267+
4268+
cancel()
4269+
4270+
for _, feed := range feeds {
4271+
closeFeed(t, feed)
4272+
}
4273+
4274+
doneWaiting := make(chan struct{})
4275+
go func() {
4276+
defer close(doneWaiting)
4277+
wg.Wait()
4278+
}()
4279+
select {
4280+
case <-doneWaiting:
4281+
case <-time.After(5 * time.Second):
4282+
t.Fatalf("timed out waiting for goroutines to finish")
4283+
}
4284+
}
4285+
// Sinkless testfeeds have some weird shutdown behaviours, so exclude them for now.
4286+
cdcTest(t, testFn, feedTestRestrictSinks("kafka", "pubsub", "webhook"))
4287+
}
4288+
41964289
func TestChangefeedEnrichedAvro(t *testing.T) {
41974290
defer leaktest.AfterTest(t)()
41984291
defer log.Scope(t).Close(t)

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ func makeDefaultFeatureFlags() cdcFeatureFlags {
517517
type feedArgs struct {
518518
sinkType sinkType
519519
targets []string
520+
envelope string
520521
opts map[string]string
521522
assumeRole string
522523
tolerateErrors bool
@@ -547,15 +548,15 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob {
547548

548549
targetsStr := strings.Join(args.targets, ", ")
549550

551+
if args.envelope == "" {
552+
args.envelope = "wrapped"
553+
}
554+
550555
feedOptions := make(map[string]string)
551556
feedOptions["min_checkpoint_frequency"] = "'10s'"
557+
feedOptions["envelope"] = args.envelope
552558
if args.sinkType == cloudStorageSink || args.sinkType == webhookSink {
553-
// Webhook and cloudstorage don't have a concept of keys and therefore
554-
// require envelope=wrapped
555-
feedOptions["envelope"] = "wrapped"
556-
557559
feedOptions["resolved"] = "'10s'"
558-
559560
} else {
560561
feedOptions["resolved"] = ""
561562
}
@@ -2311,6 +2312,40 @@ func registerCDC(r registry.Registry) {
23112312
Timeout: 1 * time.Hour,
23122313
Run: runCDCMultipleSchemaChanges,
23132314
})
2315+
r.Add(registry.TestSpec{
2316+
Name: "cdc/tpcc-100/10min/sink=kafka/envelope=enriched",
2317+
Owner: registry.OwnerCDC,
2318+
Benchmark: true,
2319+
Cluster: r.MakeClusterSpec(4, spec.WorkloadNode(), spec.CPU(16)),
2320+
Leases: registry.MetamorphicLeases,
2321+
CompatibleClouds: registry.AllClouds,
2322+
Suites: registry.Suites(registry.Nightly),
2323+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
2324+
ct := newCDCTester(ctx, t, c)
2325+
defer ct.Close()
2326+
2327+
ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"})
2328+
2329+
feed := ct.newChangefeed(feedArgs{
2330+
sinkType: kafkaSink,
2331+
envelope: "enriched",
2332+
targets: allTpccTargets,
2333+
kafkaArgs: kafkaFeedArgs{
2334+
validateOrder: true,
2335+
},
2336+
opts: map[string]string{
2337+
"initial_scan": "'no'",
2338+
"updated": "",
2339+
"enriched_properties": "source",
2340+
},
2341+
})
2342+
ct.runFeedLatencyVerifier(feed, latencyTargets{
2343+
initialScanLatency: 3 * time.Minute,
2344+
steadyLatency: 10 * time.Minute,
2345+
})
2346+
ct.waitForWorkload()
2347+
},
2348+
})
23142349
}
23152350

23162351
const (

0 commit comments

Comments
 (0)