Skip to content

Commit dc55dfb

Browse files
committed
changefeedccl: db-level feeds: fail the feed if the watched db is dropped
This patch fails a db-level changefeed when its watched database is dropped. Epic: CRDB-55919 Fixes: #156780 Release note (sql change): A database-level changefeed will now fail when its watched database is dropped.
1 parent 6d745ce commit dc55dfb

File tree

2 files changed

+51
-0
lines changed

2 files changed

+51
-0
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13483,3 +13483,38 @@ func TestChangefeedWatcherCleanupOnStop(t *testing.T) {
1348313483

1348413484
cdcTest(t, testFn, feedTestEnterpriseSinks)
1348513485
}
13486+
13487+
func TestDatabaseLevelChangefeedDropDatabase(t *testing.T) {
13488+
defer leaktest.AfterTest(t)()
13489+
defer log.Scope(t).Close(t)
13490+
13491+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13492+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13493+
sqlDB.Exec(t, `CREATE DATABASE db`)
13494+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
13495+
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
13496+
feed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
13497+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
13498+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
13499+
sqlDB.Exec(t, `DROP DATABASE db`)
13500+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateFailed)
13501+
require.NoError(t, feed.Close())
13502+
require.Error(t, enterpriseFeed.FetchTerminalJobErr())
13503+
}
13504+
cdcTest(t, testFn, feedTestEnterpriseSinks, withAllowChangefeedErr(("feed should fail when database is dropped")))
13505+
13506+
testEmptyTablesetFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
13507+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
13508+
sqlDB.Exec(t, `CREATE DATABASE db`)
13509+
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
13510+
feed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db WITH hibernation_polling_frequency='1s'`)
13511+
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
13512+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)
13513+
time.Sleep(5 * time.Second)
13514+
sqlDB.Exec(t, `DROP DATABASE db`)
13515+
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateFailed)
13516+
require.NoError(t, feed.Close())
13517+
require.Error(t, enterpriseFeed.FetchTerminalJobErr())
13518+
}
13519+
cdcTest(t, testEmptyTablesetFn, feedTestEnterpriseSinks, withAllowChangefeedErr(("feed should fail when database is dropped")))
13520+
}

pkg/ccl/changefeedccl/tableset/watcher.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,22 @@ func (w *Watcher) Start(ctx context.Context, initialTS hlc.Timestamp) (retErr er
323323
if !ok {
324324
return nil
325325
}
326+
} else {
327+
nameInfo, err := catalogkeys.DecodeNameMetadataKey(w.execCfg.Codec, kv.Key)
328+
if err != nil {
329+
return err
330+
}
331+
332+
// Database-level namespace entry tombstone.
333+
if nameInfo.ParentID == 0 && nameInfo.ParentSchemaID == 0 &&
334+
tree.Name(nameInfo.Name) == w.dbName {
335+
// Fail the watcher/changefeed: the target database entry was removed.
336+
return changefeedbase.WithTerminalError(
337+
errors.Newf("target database %q (id %d) dropped", w.dbName, w.filter.DatabaseID),
338+
)
339+
}
340+
341+
return nil
326342
}
327343

328344
if log.V(2) {

0 commit comments

Comments
 (0)