Skip to content

Commit ff83979

Browse files
craig[bot]log-headjeffswensondhartunianbghal
committed
154076: cmd/roachtest: fix cdc/multi-region-execution-locality-tpcc r=andyyang890 a=log-head This roachtest hangs waiting for changefeed to complete. Changing changefeed to an initial scan. Additionally, this test was flaky. This is fixed by removing the check of the exact span distribution and checking only that more than one aggregator was planned. Additionally, before not all lease-holders would always be set to the same region. Now, all leaseholders will be in the specified region. Additionally, adding to nightly test suite. Epic: CRDB-38755 Fixes: #153825 Release note: None 154486: cloud: have resumable reader manage span lifetime r=jeffswenson a=jeffswenson This change adds a span to the cloud.ResumableReader that is tracked for the lifetime of the reader. This ensures that if the reader implementation needs to issue a new HTTP request, the span is still valid and can be used to annotate logs. Fixes: #153347 Release note: none 154563: metric: permit counter resets in update assertion r=jasonlmfong a=dhartunian When we call `Update` on counter metrics, we have an assertion that panics in tests if the counter is set to a lower value than the prior update. In practice, counters are permitted to reset and 3rd party metric systems expect this to happen occasionally. The assertion logic is adjusted to permit a setting of zero when the value is lower. Resolves: #154364 Resolves: #154365 Resolves: #154366 Resolves: #154367 Resolves: #154368 Epic: None Release note: None 154639: sql: skip a big test that ooms on the race detector r=bghal a=bghal The test still passes without the race detector and the smaller tests still pass with the race detector. Epic: none Fixes: #154140 Fixes: #154243 Release note: None Co-authored-by: Matthew Lougheed <[email protected]> Co-authored-by: Jeff Swenson <[email protected]> Co-authored-by: David Hartunian <[email protected]> Co-authored-by: Brendan Gerrity <[email protected]>
5 parents b0f5e8c + 9933c23 + 11165de + f969221 + 0aa65d0 commit ff83979

File tree

7 files changed

+85
-66
lines changed

7 files changed

+85
-66
lines changed

pkg/cloud/azure/azure_storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ func (s *azureStorage) ReadFile(
366366
}
367367
}
368368
}
369+
// BUG: we should follow the azure retry setting here.
369370
reader := resp.NewRetryReader(ctx, &azblob.RetryReaderOptions{MaxRetries: 3})
370371
return ioctx.ReadCloserAdapter(reader), fileSize, nil
371372
}

pkg/cloud/cloud_io.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ type ReaderOpenerAt func(ctx context.Context, pos int64) (io.ReadCloser, int64,
242242
type ResumingReader struct {
243243
Opener ReaderOpenerAt // Get additional content
244244
Reader io.ReadCloser // Currently opened reader
245+
ReaderSpan *tracing.Span // Span for the current reader, if Reader is non-nil
245246
Filename string // Used for logging
246247
Pos int64 // How much data was received so far
247248
Size int64 // Total size of the file
@@ -284,6 +285,10 @@ func NewResumingReader(
284285

285286
// Open opens the reader at its current offset.
286287
func (r *ResumingReader) Open(ctx context.Context) error {
288+
if r.Reader != nil {
289+
return errors.AssertionFailedf("reader already open")
290+
}
291+
287292
if r.Size > 0 && r.Pos >= r.Size {
288293
// Don't try to open a file if the size has been set and the position is
289294
// at size. This generally results in an invalid range error for the
@@ -294,10 +299,18 @@ func (r *ResumingReader) Open(ctx context.Context) error {
294299

295300
return DelayedRetry(ctx, "Open", r.ErrFn, func() error {
296301
var readErr error
302+
303+
ctx, span := tracing.ForkSpan(ctx, "resuming-reader")
297304
r.Reader, r.Size, readErr = r.Opener(ctx, r.Pos)
298305
if readErr != nil {
306+
span.Finish()
299307
return errors.Wrapf(readErr, "open %s", r.Filename)
300308
}
309+
310+
// We hold onto the span for the lifetime of the reader because the reader
311+
// may issue new HTTP requests after Open returns.
312+
r.ReaderSpan = span
313+
301314
return nil
302315
})
303316
}
@@ -340,10 +353,9 @@ func (r *ResumingReader) Read(ctx context.Context, p []byte) (int, error) {
340353
}
341354
log.Dev.Errorf(ctx, "Retry IO error: %s", lastErr)
342355
lastErr = nil
343-
if r.Reader != nil {
344-
r.Reader.Close()
345-
}
346-
r.Reader = nil
356+
// Ignore the error from Close(). We are already handling a read error
357+
// so we know the handle is in a bad state.
358+
_ = r.Close(ctx)
347359
}
348360
}
349361

@@ -356,10 +368,14 @@ func (r *ResumingReader) Read(ctx context.Context, p []byte) (int, error) {
356368

357369
// Close implements io.Closer.
358370
func (r *ResumingReader) Close(ctx context.Context) error {
359-
if r.Reader != nil {
360-
return r.Reader.Close()
371+
if r.Reader == nil {
372+
return nil
361373
}
362-
return nil
374+
375+
err := r.Reader.Close()
376+
r.ReaderSpan.Finish()
377+
r.Reader = nil
378+
return err
363379
}
364380

365381
// CheckHTTPContentRangeHeader parses Content-Range header and ensures that

pkg/cloud/cloudtestutils/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ go_library(
2323
"//pkg/util/syncutil",
2424
"//pkg/util/sysutil",
2525
"//pkg/util/timeutil",
26+
"//pkg/util/tracing",
27+
"//pkg/util/tracing/tracingpb",
2628
"@com_github_cockroachdb_errors//:errors",
2729
"@com_github_stretchr_testify//require",
2830
"@org_golang_x_sync//errgroup",

pkg/cloud/cloudtestutils/cloud_nemesis.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/cloud"
2020
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
2121
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
22+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2224
"github.com/cockroachdb/errors"
2325
"github.com/stretchr/testify/require"
2426
"golang.org/x/sync/errgroup"
@@ -39,9 +41,15 @@ func RunCloudNemesisTest(t *testing.T, storage cloud.ExternalStorage) {
3941
listConcurrency: 1,
4042
}
4143

42-
// We create a context here because we don't want to support a caller supplied
43-
// cancelation signal.
44-
ctx := context.Background()
44+
// Create a root tracing span for the test to ensure ops create spans
45+
// which will help flush out span lifetime bugs.
46+
tracer := tracing.NewTracerWithOpt(
47+
context.Background(),
48+
tracing.WithUseAfterFinishOpt(true, true),
49+
tracing.WithSpanReusePercent(0))
50+
ctx, rootSpan := tracer.StartSpanCtx(context.Background(), "cloud-nemesis-test", tracing.WithRecording(tracingpb.RecordingStructured))
51+
defer rootSpan.Finish()
52+
4553
if err := nemesis.run(ctx, 2*time.Minute); err != nil {
4654
t.Fatalf("%+v", err)
4755
}
@@ -85,9 +93,13 @@ func (c *cloudNemesis) run(ctx context.Context, duration time.Duration) error {
8593
g, ctx := errgroup.WithContext(ctx)
8694

8795
g.Go(func() error {
88-
time.Sleep(duration)
96+
wait := time.NewTimer(duration)
97+
select {
98+
case <-ctx.Done():
99+
case <-wait.C:
100+
}
89101
close(done)
90-
return nil
102+
return ctx.Err()
91103
})
92104

93105
for i := 0; i < c.writeConcurrency; i++ {

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 39 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/pem"
2121
"fmt"
2222
"io"
23+
"maps"
2324
"math/big"
2425
"math/rand"
2526
"net"
@@ -31,6 +32,7 @@ import (
3132
"path/filepath"
3233
"regexp"
3334
"runtime"
35+
"slices"
3436
"sort"
3537
"strconv"
3638
"strings"
@@ -1950,26 +1952,18 @@ func getDiagramProcessors(ctx context.Context, db *gosql.DB) ([]any, error) {
19501952
}
19511953

19521954
type ChangefeedDistribution struct {
1953-
NodeToSpansWatched map[int]int
19541955
ZoneToSpansWatched map[string]int
19551956
TotalSpansWatched int
19561957
TotalAggregators int
1957-
TotalLeaseHolders int
1958-
TotalRanges int
1959-
NodeToZone map[int]string
19601958
}
19611959

19621960
func getChangefeedDistribution(
19631961
processors []any, nodeToZone map[int]string, t test.Test,
19641962
) ChangefeedDistribution {
19651963
changefeedDistribution := ChangefeedDistribution{
1966-
NodeToSpansWatched: make(map[int]int),
19671964
ZoneToSpansWatched: make(map[string]int),
19681965
TotalSpansWatched: 0,
19691966
TotalAggregators: 0,
1970-
TotalLeaseHolders: 0,
1971-
TotalRanges: 0,
1972-
NodeToZone: nodeToZone,
19731967
}
19741968
for _, p := range processors {
19751969
procMap, ok := p.(map[string]any)
@@ -1992,10 +1986,8 @@ func getChangefeedDistribution(
19921986
if len(matches) > 1 {
19931987
numWatches, err := strconv.Atoi(matches[1])
19941988
require.NoError(t, err)
1995-
changefeedDistribution.NodeToSpansWatched[int(nodeIdx)] += numWatches
19961989
changefeedDistribution.TotalSpansWatched += numWatches
1997-
changefeedDistribution.ZoneToSpansWatched[changefeedDistribution.NodeToZone[int(nodeIdx)]] += numWatches
1998-
1990+
changefeedDistribution.ZoneToSpansWatched[nodeToZone[int(nodeIdx)]] += numWatches
19991991
}
20001992
}
20011993
}
@@ -2004,42 +1996,36 @@ func getChangefeedDistribution(
20041996
return changefeedDistribution
20051997
}
20061998

2007-
func veryifyLeaseHolderDistribution(
2008-
db *gosql.DB, t test.Test, nodeToZone map[int]string,
2009-
) map[string]int {
2010-
var rows *gosql.Rows
2011-
// Get lease holders for all ranges in tpcc database.
2012-
leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
2013-
FROM crdb_internal.ranges r
2014-
JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
2015-
WHERE t.database_name = 'tpcc'`
2016-
rows, err := db.Query(leaseHolderQuery)
2017-
zoneToLeaseHolderCount := make(map[string]int)
2018-
require.NoError(t, err)
2019-
defer rows.Close()
2020-
for rows.Next() {
2021-
var startKeyPretty string
2022-
var replicas []uint8
2023-
var replicaLocalities []uint8
2024-
var leaseHolder int
2025-
require.NoError(t, rows.Scan(&startKeyPretty, &replicas, &replicaLocalities, &leaseHolder))
2026-
for indx := range replicas {
2027-
require.NotEqual(t, replicas[indx], 0)
2028-
replicas[indx]--
1999+
func verifyLeaseHolderLocality(db *gosql.DB, t test.Test, primaryRegion string) {
2000+
leaseHolderQuery := `SELECT NOT EXISTS (
2001+
SELECT 1
2002+
FROM [SHOW CLUSTER RANGES WITH TABLES, DETAILS]
2003+
WHERE database_name = 'tpcc'
2004+
AND (lease_holder_locality IS DISTINCT FROM $1::STRING OR lease_holder_locality IS NULL)
2005+
)`
2006+
t.L().Printf("Waiting for all lease holders to be in region %s", primaryRegion)
2007+
start := timeutil.Now()
2008+
ok := false
2009+
for {
2010+
if timeutil.Since(start) > 5*time.Minute {
2011+
t.Fatalf("Timeout waiting for lease holders to be in region %s; waited for %s", primaryRegion, timeutil.Since(start))
20292012
}
2030-
leaseHolder--
2031-
zoneToLeaseHolderCount[nodeToZone[leaseHolder]]++
2013+
require.NoError(t, db.QueryRow(leaseHolderQuery, primaryRegion).Scan(&ok))
2014+
if ok {
2015+
break
2016+
}
2017+
time.Sleep(time.Second)
20322018
}
2033-
return zoneToLeaseHolderCount
20342019
}
20352020

20362021
func registerCDC(r registry.Registry) {
20372022
r.Add(registry.TestSpec{
20382023
// This test
2039-
// 1. Creates a cluster with 3 nodes each in us-east and us-west
2040-
// 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2041-
// 3. Creates a changefeed with execution locality set to us-east
2042-
// 4. Gets the changefeed diagram and creates mappings
2024+
// 1. Creates a cluster with 3 nodes each in us-east and us-west;
2025+
// 2. Runs a tpcc workload, then congigures tpcc database to have lease holders in region us-west;
2026+
// 3. Creates a changefeed with execution locality set to us-east;
2027+
// 4. Gets the changefeed diagram and creates mappings;
2028+
// 5. Verifies that spans are assigned to multiple change aggregators in region us-east.
20432029

20442030
// This test is used to verify that ranges are evenly distributed across
20452031
// change aggregators in the execution_locality region while targeting tables
@@ -2051,7 +2037,7 @@ func registerCDC(r registry.Registry) {
20512037
Owner: registry.OwnerCDC,
20522038
Cluster: r.MakeClusterSpec(7, spec.Geo(), spec.GatherCores(), spec.GCEZones("us-east1-b,us-west1-b")),
20532039
CompatibleClouds: registry.OnlyGCE,
2054-
Suites: registry.Suites(),
2040+
Suites: registry.Suites(registry.Nightly),
20552041
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
20562042
nodeToZone := map[int]string{
20572043
0: "us-east1-b",
@@ -2064,17 +2050,24 @@ func registerCDC(r registry.Registry) {
20642050
ct := newCDCTester(ctx, t, c)
20652051
defer ct.Close()
20662052

2067-
ct.runTPCCWorkload(tpccArgs{warehouses: 100})
2053+
ct.runTPCCWorkload(tpccArgs{warehouses: 20})
20682054

20692055
var err error
2070-
_, err = ct.DB().Exec("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'")
2056+
_, err = ct.DB().Exec(`ALTER DATABASE tpcc
2057+
CONFIGURE ZONE USING
2058+
constraints = '{+region=us-west1: 1, +region=us-east1: 1}',
2059+
lease_preferences = '[[+region=us-west1]]', num_replicas = 3`)
20712060
require.NoError(t, err)
20722061

2062+
// Verify lease holders are in us-west1-b.
2063+
verifyLeaseHolderLocality(ct.DB(), t, "cloud=gce,region=us-west1,zone=us-west1-b")
2064+
20732065
feed := ct.newChangefeed(feedArgs{
20742066
sinkType: cloudStorageSink,
20752067
targets: allTpccTargets,
20762068
opts: map[string]string{
20772069
"execution_locality": "'region=us-east1'",
2070+
"initial_scan": "'only'",
20782071
},
20792072
})
20802073
ct.waitForWorkload()
@@ -2083,18 +2076,12 @@ func registerCDC(r registry.Registry) {
20832076
processors, err := getDiagramProcessors(ctx, ct.DB())
20842077
require.NoError(t, err)
20852078

2079+
// Verify changefeed aggregators are distributed across nodes in region us-east.
20862080
changefeedDistribution := getChangefeedDistribution(processors, nodeToZone, t)
20872081
require.Greater(t, changefeedDistribution.TotalAggregators, 1)
2088-
for nodeIdx, spansWatched := range changefeedDistribution.NodeToSpansWatched {
2089-
require.LessOrEqual(t, spansWatched, changefeedDistribution.TotalSpansWatched/2, "nodeIdx %d watched %d spans, total spans watched %d", nodeIdx, spansWatched, changefeedDistribution.TotalSpansWatched)
2090-
}
2091-
require.Equal(t, 1, len(changefeedDistribution.ZoneToSpansWatched))
2082+
require.ElementsMatch(t, []string{"us-east1-b"}, slices.Collect(maps.Keys(changefeedDistribution.ZoneToSpansWatched)))
20922083
require.Equal(t, changefeedDistribution.ZoneToSpansWatched["us-east1-b"], changefeedDistribution.TotalSpansWatched)
2093-
zoneToLeaseHolderCount := veryifyLeaseHolderDistribution(ct.DB(), t, nodeToZone)
2094-
// Majority of lease holders should be in us-west1-b. Some may not, but most should.
2095-
if zoneToLeaseHolderCount["us-east1-b"] != 0 {
2096-
require.Greater(t, zoneToLeaseHolderCount["us-west1-b"]/zoneToLeaseHolderCount["us-east1-b"], 10)
2097-
}
2084+
require.Greater(t, changefeedDistribution.TotalSpansWatched, 0)
20982085
},
20992086
})
21002087
r.Add(registry.TestSpec{

pkg/sql/sem/tree/pretty_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ func TestPrettyVerify(t *testing.T) {
227227
func TestPrettyBigStatement(t *testing.T) {
228228
defer leaktest.AfterTest(t)()
229229
defer log.Scope(t).Close(t)
230+
skip.UnderRace(t, "excessive memory usage")
230231

231232
// Create a SELECT statement with a 1 million item IN expression. Without
232233
// mitigation, this can cause stack overflows - see #91197.

pkg/util/metric/metric.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ func (c *Counter) Inc(v int64) {
825825
// maintained elsewhere.
826826
func (c *Counter) Update(val int64) {
827827
if buildutil.CrdbTestBuild {
828-
if prev := c.count.Load(); val < prev {
828+
if prev := c.count.Load(); val < prev && val != 0 {
829829
panic(fmt.Sprintf("Counters should not decrease, prev: %d, new: %d.", prev, val))
830830
}
831831
}
@@ -1446,7 +1446,7 @@ func (cv *CounterVec) Update(labels map[string]string, v int64) {
14461446
}
14471447

14481448
currentValue := cv.Count(labels)
1449-
if currentValue > v {
1449+
if currentValue > v && v != 0 {
14501450
panic(fmt.Sprintf("Counters should not decrease, prev: %d, new: %d.", currentValue, v))
14511451
}
14521452

0 commit comments

Comments
 (0)