Skip to content

Commit fc8482b

Browse files
committed
changefeedccl: fix unwatched column families memory monitoring bug
This patch fixes a bug where the memory monitor wouldn't reclaim the memory allocated to events corresponding to unwatched column families for a changefeed that targets only a subset of a table's families. Release note (bug fix): A bug where a changefeed targeting only a subset of a table's column families could become stuck has been fixed.
1 parent 70ad6fb commit fc8482b

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ go_test(
205205
"alter_changefeed_test.go",
206206
"changefeed_dist_test.go",
207207
"changefeed_job_info_test.go",
208+
"changefeed_memory_test.go",
208209
"changefeed_processors_test.go",
209210
"changefeed_progress_test.go",
210211
"changefeed_stmt_test.go",
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedccl
7+
8+
import (
9+
"context"
10+
"math/rand/v2"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
16+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
)
19+
20+
// TestChangefeedUnwatchedFamilyMemoryMonitoring verifies that changefeeds
21+
// correctly release memory allocations for events corresponding to unwatched
22+
// column families after discarding them. This is a regression test for #154776.
23+
func TestChangefeedUnwatchedFamilyMemoryMonitoring(t *testing.T) {
24+
defer leaktest.AfterTest(t)()
25+
defer log.Scope(t).Close(t)
26+
27+
ctx := context.Background()
28+
29+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
30+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
31+
32+
// Set a low memory limit.
33+
changefeedbase.PerChangefeedMemLimit.Override(
34+
ctx, &s.Server.ClusterSettings().SV, 1<<20 /* 1 MiB */)
35+
36+
// Create a table with two column families.
37+
sqlDB.Exec(t, `CREATE TABLE foo (
38+
id INT PRIMARY KEY,
39+
a STRING,
40+
b STRING,
41+
FAMILY f1 (id, a),
42+
FAMILY f2 (b)
43+
)`)
44+
45+
// Insert initial data.
46+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a', 'b')`)
47+
48+
var args []any
49+
if _, ok := f.(*webhookFeedFactory); ok {
50+
args = append(args, optOutOfMetamorphicEnrichedEnvelope{
51+
reason: "enriched envelopes do not support column families for webhook sinks",
52+
})
53+
}
54+
55+
// Start changefeed watching only f1 with diff enabled.
56+
// Events from f2 should trigger ErrUnwatchedFamily.
57+
feed := feed(t, f,
58+
`CREATE CHANGEFEED FOR foo FAMILY f1 WITH diff, initial_scan='no', resolved`, args...)
59+
defer closeFeed(t, feed)
60+
61+
// Update a watched column to generate an event.
62+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_1' WHERE id = 1`)
63+
assertPayloads(t, feed, []string{
64+
`foo.f1: [1]->{"after": {"a": "a_1", "id": 1}, "before": {"a": "a", "id": 1}}`,
65+
})
66+
67+
// Generate a lot of events for the unwatched family. If the memory
68+
// allocations are being leaked, this would cause the changefeed to
69+
// exceed the previously configured 1 MiB limit.
70+
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
71+
for range 1000 {
72+
// Each update will create an event of size ~2 KiB
73+
// (~1 KiB for each of before/after).
74+
data := make([]byte, 1<<10 /* 1 KiB */)
75+
for i := range data {
76+
data[i] = charset[rand.IntN(len(charset))]
77+
}
78+
sqlDB.Exec(t, `UPDATE foo SET b = $1 WHERE id = 1`, data)
79+
}
80+
81+
// Update watched column again to verify the feed is still progressing.
82+
// If the memory allocations leaked, this assertion would time out.
83+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_2' WHERE id = 1`)
84+
assertPayloads(t, feed, []string{
85+
`foo.f1: [1]->{"after": {"a": "a_2", "id": 1}, "before": {"a": "a_1", "id": 1}}`,
86+
})
87+
}
88+
89+
cdcTest(t, testFn)
90+
}

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
361361
// Column families are stored contiguously, so we'll get
362362
// events for each one even if we're not watching them all.
363363
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
364+
// Release the event's allocation since we're not processing it.
365+
a := ev.DetachAlloc()
366+
a.Release(ctx)
364367
return nil
365368
}
366369
return err
@@ -377,6 +380,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
377380
// Column families are stored contiguously, so we'll get
378381
// events for each one even if we're not watching them all.
379382
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
383+
// Release the event's allocation since we're not processing it.
384+
a := ev.DetachAlloc()
385+
a.Release(ctx)
380386
return nil
381387
}
382388
return err

0 commit comments

Comments
 (0)