Skip to content

Commit 5c54cd9

Browse files
craig[bot]Yevgeniy Miretskiy
andcommitted
107923: changefeedccl: Correctly handle dropped UDTs r=miretskiy a=miretskiy Schema feed is responsible for the determination of correct schema descriptor corresponding to an event occuring at certain time. Due to the need to use low level API to retrieve historical descriptor versions, it is possible that the schema feed would observe descriptors that were dropped/deleted/truncated. A long outstanding bug in schema feed would incorrectly check for low level value being 'nil' in order to determine if the descriptor was dropped. This check is incorrect since the iterator may return a non-nil empty byte array reprsenting a tombstone. A non-nil, but empty KV value would lead schema feed to incorrectly attempt to deserialize protocol message, resulting in a `value type is not BYTES: UNKNOWN` being returned. This PR fixes the above issue, and adds a test to verify correct behavior. Fixes https://github.com/cockroachlabs/support/issues/2408 Release note (enterprise change): Fix a rare changefeed issue which is triggered when the parent database or types are dropped, and instead of exiting with a descriptive error message, the changefeed would observe an opaque error instead (`value type is not BYTES: UNKNOWN`) Co-authored-by: Yevgeniy Miretskiy <[email protected]>
2 parents fcfa15d + 26f93f6 commit 5c54cd9

File tree

2 files changed

+63
-2
lines changed

2 files changed

+63
-2
lines changed

pkg/ccl/changefeedccl/schemafeed/schema_feed.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -857,12 +857,19 @@ func (tf *schemaFeed) fetchDescriptorVersions(
857857
if err != nil {
858858
return err
859859
}
860-
if unsafeValue == nil {
860+
861+
if len(unsafeValue) == 0 {
862+
if isType {
863+
return changefeedbase.WithTerminalError(
864+
errors.Wrapf(catalog.ErrDescriptorDropped, "type descriptor %d dropped", id))
865+
}
866+
861867
name := origName
862868
if name == "" {
863869
name = changefeedbase.StatementTimeName(fmt.Sprintf("desc(%d)", id))
864870
}
865-
return errors.Errorf(`"%v" was dropped or truncated`, name)
871+
return changefeedbase.WithTerminalError(
872+
errors.Wrapf(catalog.ErrDescriptorDropped, `table "%v"[%d] was dropped or truncated`, name, id))
866873
}
867874

868875
// Unmarshal the descriptor.

pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,57 @@ func TestFetchDescriptorVersionsCPULimiterPagination(t *testing.T) {
262262
require.Len(t, desc, 2)
263263
require.Equal(t, 2, numRequests)
264264
}
265+
266+
func TestSchemaFeedHandlesCascadeDatabaseDrop(t *testing.T) {
267+
defer leaktest.AfterTest(t)()
268+
defer log.Scope(t).Close(t)
269+
270+
ctx := context.Background()
271+
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
272+
defer srv.Stopper().Stop(ctx)
273+
s := srv.ApplicationLayer()
274+
sqlServer := s.SQLServer().(*sql.Server)
275+
276+
sqlDB := sqlutils.MakeSQLRunner(db)
277+
278+
beforeCreate := s.Clock().Now()
279+
280+
// Create a database, containing user defined type along with table using that type.
281+
sqlDB.ExecMultiple(t,
282+
`CREATE DATABASE test`,
283+
`USE test`,
284+
`CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`,
285+
`CREATE TABLE foo(a INT, t status DEFAULT 'open')`,
286+
`USE defaultdb`,
287+
)
288+
289+
var targets changefeedbase.Targets
290+
var tableID descpb.ID
291+
sqlDB.QueryRow(t, "SELECT 'test.foo'::regclass::int").Scan(&tableID)
292+
targets.Add(changefeedbase.Target{
293+
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
294+
TableID: tableID,
295+
FamilyName: "primary",
296+
StatementTimeName: "foo",
297+
})
298+
sf := New(ctx, &sqlServer.GetExecutorConfig().DistSQLSrv.ServerConfig,
299+
TestingAllEventFilter, targets, s.Clock().Now(), nil, changefeedbase.CanHandle{
300+
MultipleColumnFamilies: true,
301+
VirtualColumns: true,
302+
}).(*schemaFeed)
303+
304+
// initialize type dependencies in schema feed.
305+
require.NoError(t, sf.primeInitialTableDescs(ctx))
306+
307+
// DROP database with cascade to cause the type along with the table to be dropped.
308+
// Dropped tables are marked as being dropped (i.e. there is an MVCC version of the
309+
// descriptor that has a state indicating that the table is being dropped).
310+
// However, dependent UDTs are simply deleted so, there is an MVCC tombstone for that type.
311+
sqlDB.Exec(t, `DROP DATABASE test CASCADE;`)
312+
313+
// Fetching descriptor versions from before the initial create statement
314+
// up until the current time should result in a catalog.ErrDescriptorDropped error.
315+
_, err := sf.fetchDescriptorVersions(ctx, beforeCreate, s.Clock().Now())
316+
require.True(t, errors.Is(err, catalog.ErrDescriptorDropped),
317+
"expected dropped descriptor error, found: %v", err)
318+
}

0 commit comments

Comments
 (0)