Skip to content

Commit 5bef842

Browse files
committed
changefeedccl: db-level feeds: fail the feed if the watched db is dropped
This patch fails a db-level changefeed when its watched database is dropped. Epic: CRDB-55919 Fixes: #156780 Release note (sql change): A database-level changefeed will now fail when its watched database is dropped.
1 parent 83d84a5 commit 5bef842

File tree

3 files changed

+64
-17
lines changed

3 files changed

+64
-17
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12842,21 +12842,15 @@ func TestDatabaseLevelChangefeedRenameDatabase(t *testing.T) {
1284212842
sqlDB.Exec(t, `CREATE DATABASE foo;`)
1284312843
sqlDB.Exec(t, `CREATE TABLE foo.bar (id INT PRIMARY KEY);`)
1284412844

12845-
feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`)
12846-
defer closeFeed(t, feed1)
12845+
feed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`)
1284712846

12848-
sqlDB.Exec(t, `INSERT INTO foo.bar VALUES (1);`)
12849-
expectedRows := []string{
12850-
`bar: [1]->{"after": {"id": 1}}`,
12851-
}
12852-
assertPayloads(t, feed1, expectedRows)
12847+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
12848+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
1285312849

1285412850
sqlDB.Exec(t, `ALTER DATABASE foo RENAME TO bar;`)
12855-
sqlDB.Exec(t, `INSERT INTO bar.bar VALUES (2);`)
12856-
expectedRows = []string{
12857-
`bar: [2]->{"after": {"id": 2}}`,
12858-
}
12859-
assertPayloads(t, feed1, expectedRows)
12851+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateFailed)
12852+
require.NoError(t, feed.Close())
12853+
require.Error(t, enterpriseFeed.FetchTerminalJobErr())
1286012854
}
1286112855
// TODO(#152196): Remove feedTestUseRootUserConnection once we have ALTER
1286212856
// DEFAULT PRIVILEGES for databases
@@ -13488,3 +13482,42 @@ func TestChangefeedWatcherCleanupOnStop(t *testing.T) {
1348813482

1348913483
cdcTest(t, testFn, feedTestEnterpriseSinks)
1349013484
}
13485+
13486+
func TestDatabaseLevelChangefeedDropDatabase(t *testing.T) {
13487+
defer leaktest.AfterTest(t)()
13488+
defer log.Scope(t).Close(t)
13489+
13490+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13491+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13492+
sqlDB.Exec(t, `CREATE DATABASE db`)
13493+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
13494+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
13495+
feed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
13496+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
13497+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
13498+
sqlDB.Exec(t, `insert into db.foo values (1, 'test')`)
13499+
assertPayloads(t, feed, []string{
13500+
`foo: [1]->{"after": {"a": 1, "b": "test"}}`,
13501+
})
13502+
sqlDB.Exec(t, `DROP DATABASE db`)
13503+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateFailed)
13504+
require.NoError(t, feed.Close())
13505+
require.Error(t, enterpriseFeed.FetchTerminalJobErr())
13506+
}
13507+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr(("feed should fail when database is dropped")))
13508+
13509+
testEmptyTablesetFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13510+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13511+
sqlDB.Exec(t, `CREATE DATABASE db`)
13512+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
13513+
feed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db WITH hibernation_polling_frequency='1s'`)
13514+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
13515+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
13516+
time.Sleep(5 * time.Second)
13517+
sqlDB.Exec(t, `DROP DATABASE db`)
13518+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateFailed)
13519+
require.NoError(t, feed.Close())
13520+
require.Error(t, enterpriseFeed.FetchTerminalJobErr())
13521+
}
13522+
cdcTest(t, testEmptyTablesetFn, feedTestEnterpriseSinks, withAllowChangefeedErr(("feed should fail when database is dropped")))
13523+
}

pkg/ccl/changefeedccl/tableset/watcher.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,20 @@ func (w *Watcher) Start(ctx context.Context, initialTS hlc.Timestamp) (retErr er
315315
var err error
316316
var ok bool
317317

318+
nameInfo, err := catalogkeys.DecodeNameMetadataKey(w.execCfg.Codec, kv.Key)
319+
if err != nil {
320+
return err
321+
}
322+
323+
// Database-level namespace entry tombstone.
324+
if nameInfo.ParentID == 0 && nameInfo.ParentSchemaID == 0 &&
325+
tree.Name(nameInfo.Name) == w.dbName {
326+
// Fail the watcher/changefeed: the target database entry was removed.
327+
return changefeedbase.WithTerminalError(
328+
errors.Newf("target database %q (id %d) dropped or renamed", w.dbName, w.filter.DatabaseID),
329+
)
330+
}
331+
318332
if kv.Value.IsPresent() {
319333
table, ok, err = w.kvToTable(ctx, roachpb.KeyValue{Key: kv.Key, Value: kv.Value}, dec)
320334
if err != nil {

pkg/ccl/changefeedccl/tableset/watcher_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,12 +381,12 @@ func TestWatcherDropDatabase(t *testing.T) {
381381

382382
db.Exec(t, "DROP DATABASE defaultdb")
383383

384-
adds, err := watcher.PopUpTo(ctx, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
385-
require.NoError(t, err)
386-
assert.Empty(t, adds)
384+
_, err = watcher.PopUpTo(ctx, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
385+
require.Error(t, err)
386+
require.Contains(t, err.Error(), "dropped")
387387

388-
cancel()
389-
require.ErrorIs(t, eg.Wait(), context.Canceled)
388+
err = eg.Wait()
389+
require.Contains(t, err.Error(), "dropped")
390390
}
391391

392392
func getDatabaseID(

0 commit comments

Comments
 (0)