Skip to content

Commit 588257e

Browse files
craig[bot]DarrylWongangles-n-daemonsasg0451
committed
150854: roachtest: don't specify virtual cluster in debug zips r=golgeek,herkolategan a=DarrylWong Previously, we were always passing the system tenant when taking debug zips in case a roachtest had overriden the default virtual cluster. However, the debug zip command already attempts to fetch all virtual clusters by passing the --ccluster flag for each tenant. This behavior was suppresed due to our hard coding of the system tenant, causing all debug zips to be of the system tenant. This changes it to instead pass nothing so the url has no existing --ccluster flag. Fixes: #143841 Release note: none 150933: server: change sort order of local hot ranges to descending r=angles-n-daemons a=angles-n-daemons Currently the hot ranges logging system expects that ranges are sorted in descending order, so that it can inspect the values from greatest to least ([link](https://github.com/cockroachdb/cockroach/blob/master/pkg/server/application_api/storage_inspection_test.go#L377)). This is not the case, as the system currently returns those values in ascending order. To fix this, we just reverse the comparison used for sorting at the local hot ranges collector. Epic: none Fixes: none Release note: none 150964: changefeedccl: support rangefeed bulk delivery r=andyyang890,dt a=asg0451 Add support for the new rangefeed bulk delivery feature in changefeeds. This should improve catchup scan performance. Fixes: #150862 Release note: None 150975: roachtest: fix test failed message r=herkolategan a=DarrylWong Release note: none Epic: none Fixes: none Co-authored-by: DarrylWong <[email protected]> Co-authored-by: Brian Dillmann <[email protected]> Co-authored-by: Miles Frankel <[email protected]>
5 parents 789d7e3 + 1d1112e + 275d4b0 + 1148c1d + 8081532 commit 588257e

File tree

6 files changed

+95
-74
lines changed

6 files changed

+95
-74
lines changed

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 79 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
9393
}
9494
g := ctxgroup.WithContext(ctx)
9595
g.GoCtx(feed.addEventsToBuffer)
96-
var rfOpts []kvcoord.RangeFeedOption
96+
97+
// Bulk delivery is an optimization that decreases rangefeed overhead during
98+
// catchup scans. It results in the emission of BulkEvents instead of
99+
// individual events where possible.
100+
rfOpts := []kvcoord.RangeFeedOption{kvcoord.WithBulkDelivery()}
97101
if cfg.WithDiff {
98102
rfOpts = append(rfOpts, kvcoord.WithDiff())
99103
}
@@ -132,6 +136,78 @@ func quantizeTS(ts hlc.Timestamp, granularity time.Duration) hlc.Timestamp {
132136
}
133137
}
134138

139+
func (p *rangefeed) handleRangefeedEvent(ctx context.Context, e *kvpb.RangeFeedEvent) error {
140+
switch t := e.GetValue().(type) {
141+
case *kvpb.RangeFeedValue:
142+
if p.cfg.Knobs.OnRangeFeedValue != nil {
143+
if err := p.cfg.Knobs.OnRangeFeedValue(); err != nil {
144+
return err
145+
}
146+
}
147+
stop := p.st.RangefeedBufferValue.Start()
148+
if err := p.memBuf.Add(
149+
ctx, kvevent.MakeKVEvent(e),
150+
); err != nil {
151+
return err
152+
}
153+
stop()
154+
case *kvpb.RangeFeedCheckpoint:
155+
ev := e.ShallowCopy()
156+
ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, p.cfg.WithFrontierQuantize)
157+
if resolvedTs := ev.Checkpoint.ResolvedTS; !resolvedTs.IsEmpty() && resolvedTs.Less(p.cfg.Frontier) {
158+
// RangeFeed happily forwards any closed timestamps it receives as
159+
// soon as there are no outstanding intents under them.
160+
// Changefeeds don't care about these at all, so throw them out.
161+
return nil
162+
}
163+
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
164+
return nil
165+
}
166+
stop := p.st.RangefeedBufferCheckpoint.Start()
167+
if err := p.memBuf.Add(
168+
ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE),
169+
); err != nil {
170+
return err
171+
}
172+
stop()
173+
case *kvpb.RangeFeedSSTable:
174+
// For now, we just error on SST ingestion, since we currently don't
175+
// expect SST ingestion into spans with active changefeeds.
176+
return errors.Errorf("unexpected SST ingestion: %v", t)
177+
case *kvpb.RangeFeedBulkEvents:
178+
// TODO(#138346): We can handle these more gracefully once we
179+
// migrate to the new rangefeed client. Until then, this is
180+
// still an improvement over not using these.
181+
for _, e := range t.Events {
182+
if err := p.handleRangefeedEvent(ctx, e); err != nil {
183+
return err
184+
}
185+
}
186+
case *kvpb.RangeFeedDeleteRange:
187+
// For now, we just ignore on MVCC range tombstones. These are currently
188+
// only expected to be used by schema GC and IMPORT INTO, and such spans
189+
// should not have active changefeeds across them, at least at the times
190+
// of interest. A case where one will show up in a changefeed is when
191+
// the primary index changes while we're watching it and then the old
192+
// primary index is dropped. In this case, we'll get a schema event to
193+
// restart into the new primary index, but the DeleteRange may come
194+
// through before the schema event.
195+
//
196+
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
197+
// IMPORT INTO which gets rolled back using MVCC range tombstones will
198+
// not be visible to a changefeed, neither when it was started before
199+
// the import or when resuming from a timestamp before the import. The
200+
// table decriptor should be marked as offline during the import, and
201+
// catchup scans should detect that this happened and prevent reading
202+
// anything in that timespan. See:
203+
// https://github.com/cockroachdb/cockroach/issues/70433
204+
return nil
205+
default:
206+
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
207+
}
208+
return nil
209+
}
210+
135211
// addEventsToBuffer consumes rangefeed events from `p.eventCh`, transforms
136212
// them to kvevent.Event's, and pushes them into `p.memBuf`.
137213
func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
@@ -141,69 +217,8 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
141217
for {
142218
select {
143219
case e := <-p.eventCh:
144-
switch t := e.GetValue().(type) {
145-
case *kvpb.RangeFeedValue:
146-
if p.cfg.Knobs.OnRangeFeedValue != nil {
147-
if err := p.cfg.Knobs.OnRangeFeedValue(); err != nil {
148-
return err
149-
}
150-
}
151-
stop := p.st.RangefeedBufferValue.Start()
152-
if err := p.memBuf.Add(
153-
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
154-
); err != nil {
155-
return err
156-
}
157-
stop()
158-
case *kvpb.RangeFeedCheckpoint:
159-
ev := e.ShallowCopy()
160-
ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, p.cfg.WithFrontierQuantize)
161-
if resolvedTs := ev.Checkpoint.ResolvedTS; !resolvedTs.IsEmpty() && resolvedTs.Less(p.cfg.Frontier) {
162-
// RangeFeed happily forwards any closed timestamps it receives as
163-
// soon as there are no outstanding intents under them.
164-
// Changefeeds don't care about these at all, so throw them out.
165-
continue
166-
}
167-
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
168-
continue
169-
}
170-
stop := p.st.RangefeedBufferCheckpoint.Start()
171-
if err := p.memBuf.Add(
172-
ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE),
173-
); err != nil {
174-
return err
175-
}
176-
stop()
177-
case *kvpb.RangeFeedSSTable:
178-
// For now, we just error on SST ingestion, since we currently don't
179-
// expect SST ingestion into spans with active changefeeds.
180-
return errors.Errorf("unexpected SST ingestion: %v", t)
181-
case *kvpb.RangeFeedBulkEvents:
182-
// Should be disabled so this is unreachable.
183-
return errors.Errorf("unexpected bulk delivery: %v", t)
184-
185-
case *kvpb.RangeFeedDeleteRange:
186-
// For now, we just ignore on MVCC range tombstones. These are currently
187-
// only expected to be used by schema GC and IMPORT INTO, and such spans
188-
// should not have active changefeeds across them, at least at the times
189-
// of interest. A case where one will show up in a changefeed is when
190-
// the primary index changes while we're watching it and then the old
191-
// primary index is dropped. In this case, we'll get a schema event to
192-
// restart into the new primary index, but the DeleteRange may come
193-
// through before the schema event.
194-
//
195-
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
196-
// IMPORT INTO which gets rolled back using MVCC range tombstones will
197-
// not be visible to a changefeed, neither when it was started before
198-
// the import or when resuming from a timestamp before the import. The
199-
// table decriptor should be marked as offline during the import, and
200-
// catchup scans should detect that this happened and prevent reading
201-
// anything in that timespan. See:
202-
// https://github.com/cockroachdb/cockroach/issues/70433
203-
continue
204-
205-
default:
206-
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
220+
if err := p.handleRangefeedEvent(ctx, e.RangeFeedEvent); err != nil {
221+
return err
207222
}
208223
case <-ctx.Done():
209224
return ctx.Err()

pkg/cmd/roachtest/cluster.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,12 +1389,14 @@ func (c *clusterImpl) FetchDebugZip(
13891389
for _, node := range nodes {
13901390
pgURLOpts := roachprod.PGURLOptions{
13911391
// `cockroach debug zip` does not support non root authentication.
1392-
Auth: install.AuthRootCert,
1393-
// request the system tenant specifically in case the test
1394-
// changed the default virtual cluster.
1395-
VirtualClusterName: install.SystemInterfaceName,
1392+
Auth: install.AuthRootCert,
1393+
Secure: c.IsSecure(),
13961394
}
1397-
nodePgUrl, err := c.InternalPGUrl(ctx, l, c.Node(node), pgURLOpts)
1395+
// Use roachprod.PgURL directly as we want to bypass the default virtual cluster
1396+
// logic. The debug zip command already handles fetching all virtual clusters by passing
1397+
// the --ccluster for each tenant. Attempting to pass a --ccluster here will override
1398+
// that behavior and cause all debug zips to be of the same tenant.
1399+
urls, err := roachprod.PgURL(ctx, l, c.MakeNodes(c.Node(node)), install.CockroachNodeCertsDir, pgURLOpts)
13981400
if err != nil {
13991401
l.Printf("cluster.FetchDebugZip failed to retrieve PGUrl on node %d: %v", node, err)
14001402
continue
@@ -1409,7 +1411,7 @@ func (c *clusterImpl) FetchDebugZip(
14091411
cmd := roachtestutil.NewCommand("%s debug zip", test.DefaultCockroachPath).
14101412
Option("include-range-info").
14111413
Flag("exclude-files", fmt.Sprintf("'%s'", excludeFiles)).
1412-
Flag("url", fmt.Sprintf("'%s'", nodePgUrl[0])).
1414+
Flag("url", urls[0]).
14131415
MaybeFlag(c.IsSecure(), "certs-dir", install.CockroachNodeCertsDir).
14141416
Arg(zipName).
14151417
String()

pkg/cmd/roachtest/test_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func (r *testRunner) Run(
471471
err = errors.Join(err, errSomeClusterProvisioningFailed)
472472
}
473473
if len(r.status.fail) > 0 {
474-
shout(ctx, l, lopt.stdout, "%d tests failed", r.status.fail)
474+
shout(ctx, l, lopt.stdout, "%d tests failed", len(r.status.fail))
475475
err = errors.Join(err, errTestsFailed)
476476
}
477477
if err != nil {

pkg/roachprod/roachprod.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,12 @@ func PgURL(
11731173
if len(urls) != len(nodes) {
11741174
return nil, errors.Errorf("have nodes %v, but urls %v from ips %v", nodes, urls, ips)
11751175
}
1176+
// We should never return an empty list of URLs as roachprod clusters always have at least
1177+
// one node. However, many callers of this function directly index into the slice returned,
1178+
// so check just in case.
1179+
if len(urls) == 0 {
1180+
return nil, errors.Newf("have nodes %v, but no urls were found", nodes)
1181+
}
11761182
return urls, nil
11771183
}
11781184

pkg/server/application_api/storage_inspection_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,6 @@ func TestHotRanges2Response(t *testing.T) {
340340
defer leaktest.AfterTest(t)()
341341
defer log.Scope(t).Close(t)
342342

343-
skip.WithIssue(t, 146917)
344-
345343
srv := rangetestutils.StartServer(t)
346344
defer srv.Stopper().Stop(context.Background())
347345
ts := srv.ApplicationLayer()

pkg/server/status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3069,7 +3069,7 @@ func (s *systemStatusServer) localHotRanges(
30693069

30703070
// sort the slices by cpu
30713071
slices.SortFunc(resp.Ranges, func(a, b *serverpb.HotRangesResponseV2_HotRange) int {
3072-
return cmp.Compare(a.CPUTimePerSecond, b.CPUTimePerSecond)
3072+
return cmp.Compare(b.CPUTimePerSecond, a.CPUTimePerSecond)
30733073
})
30743074

30753075
// truncate the response if localLimit is set

0 commit comments

Comments
 (0)