Skip to content

Commit 186ac82

Browse files
committed
changefeedccl: fix unwatched column families memory monitoring bug
This patch fixes a bug where the memory monitor wouldn't reclaim the memory allocated to events corresponding to unwatched column families for a changefeed that targets only a subset of a table's families. Release note (bug fix): A bug where a changefeed targeting only a subset of a table's column families could become stuck has been fixed.
1 parent 2c55282 commit 186ac82

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ go_test(
203203
"alter_changefeed_test.go",
204204
"changefeed_dist_test.go",
205205
"changefeed_job_info_test.go",
206+
"changefeed_memory_test.go",
206207
"changefeed_processors_test.go",
207208
"changefeed_progress_test.go",
208209
"changefeed_stmt_test.go",
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedccl
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
13+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
14+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
18+
)
19+
20+
// TestChangefeedUnwatchedFamilyMemoryMonitoring verifies that changefeeds
21+
// correctly release memory allocations for events corresponding to unwatched
22+
// column families after discarding them. This is a regression test for #154776.
23+
func TestChangefeedUnwatchedFamilyMemoryMonitoring(t *testing.T) {
24+
defer leaktest.AfterTest(t)()
25+
defer log.Scope(t).Close(t)
26+
27+
ctx := context.Background()
28+
rnd, _ := randutil.NewTestRand()
29+
30+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
31+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
32+
33+
// Set a low memory limit.
34+
changefeedbase.PerChangefeedMemLimit.Override(
35+
ctx, &s.Server.ClusterSettings().SV, 1<<20 /* 1 MiB */)
36+
37+
// Create a table with two column families.
38+
sqlDB.Exec(t, `CREATE TABLE foo (
39+
id INT PRIMARY KEY,
40+
a STRING,
41+
b STRING,
42+
FAMILY f1 (id, a),
43+
FAMILY f2 (b)
44+
)`)
45+
46+
// Insert initial data.
47+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a', 'b')`)
48+
49+
var args []any
50+
if _, ok := f.(*webhookFeedFactory); ok {
51+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{
52+
reason: "enriched envelopes do not support column families for webhook sinks",
53+
})
54+
}
55+
56+
// Start changefeed watching only f1 with diff enabled.
57+
// Events from f2 should trigger ErrUnwatchedFamily.
58+
feed := feed(t, f,
59+
`CREATE CHANGEFEED FOR foo FAMILY f1 WITH diff, initial_scan='no', resolved`, args...)
60+
defer closeFeed(t, feed)
61+
62+
// Update a watched column to generate an event.
63+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_1' WHERE id = 1`)
64+
assertPayloads(t, feed, []string{
65+
`foo.f1: [1]->{"after": {"a": "a_1", "id": 1}, "before": {"a": "a", "id": 1}}`,
66+
})
67+
68+
// Generate a lot of events for the unwatched family. If the memory
69+
// allocations are being leaked, this would cause the changefeed to
70+
// exceed the previously configured 1 MiB limit and become stuck
71+
// when it attempts to process more events after the limit is hit.
72+
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
73+
for range 1000 {
74+
// Each update will create an event of size ~2 KiB
75+
// (~1 KiB for each of before/after).
76+
data := make([]byte, 1<<10 /* 1 KiB */)
77+
for i := range data {
78+
data[i] = charset[rnd.Intn(len(charset))]
79+
}
80+
sqlDB.Exec(t, `UPDATE foo SET b = $1 WHERE id = 1`, data)
81+
}
82+
83+
// Update watched column again to verify the feed is still progressing.
84+
// If the memory allocations leaked, this assertion would time out
85+
// because the changefeed would be stuck.
86+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_2' WHERE id = 1`)
87+
assertPayloads(t, feed, []string{
88+
`foo.f1: [1]->{"after": {"a": "a_2", "id": 1}, "before": {"a": "a_1", "id": 1}}`,
89+
})
90+
}
91+
92+
cdcTest(t, testFn)
93+
}

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
360360
// Column families are stored contiguously, so we'll get
361361
// events for each one even if we're not watching them all.
362362
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
363+
// Release the event's allocation since we're not processing it.
364+
a := ev.DetachAlloc()
365+
a.Release(ctx)
363366
return nil
364367
}
365368
return err
@@ -376,6 +379,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
376379
// Column families are stored contiguously, so we'll get
377380
// events for each one even if we're not watching them all.
378381
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
382+
// Release the event's allocation since we're not processing it.
383+
a := ev.DetachAlloc()
384+
a.Release(ctx)
379385
return nil
380386
}
381387
return err

0 commit comments

Comments
 (0)