Skip to content

Commit 6d745ce

Browse files
committed
changefeedccl: use new watcher functionality for empty tableset
This patch uses new functionality for the tableset watcher when hibernating the changefeed on an empty tableset. It passes the changefeed description's filter directly to the watcher for empty feeds, rather than deconstructing to get the table names from the FQN. It also replaces PopUnchangedUpTo with the newer PopUpTo. Fixes: #159275 Epic: CRDB-1421 Release note: None
1 parent 237db15 commit 6d745ce

File tree

1 file changed

+15
-57
lines changed

1 file changed

+15
-57
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 15 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import (
4545
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
4646
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
4747
"github.com/cockroachdb/cockroach/pkg/sql/isql"
48-
"github.com/cockroachdb/cockroach/pkg/sql/parser"
4948
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
5049
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
5150
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
@@ -1414,43 +1413,6 @@ func requiresTopicInValue(s Sink) bool {
14141413
return s.getConcreteType() == sinkTypeWebhook
14151414
}
14161415

1417-
// buildDatabaseWatcherFilterFromSpec constructs a tableset watcher filter from a
1418-
// database-level changefeed target specification.
1419-
func buildDatabaseWatcherFilterFromSpec(
1420-
spec jobspb.ChangefeedTargetSpecification,
1421-
) (tableset.Filter, error) {
1422-
filter := tableset.Filter{
1423-
DatabaseID: spec.DescID,
1424-
}
1425-
if spec.FilterList != nil && len(spec.FilterList.Tables) > 0 {
1426-
switch spec.FilterList.FilterType {
1427-
case tree.IncludeFilter:
1428-
filter.IncludeTables = make(map[string]struct{})
1429-
for fqTableName := range spec.FilterList.Tables {
1430-
// TODO(#156859): use fully qualified names once watcher supports it.
1431-
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
1432-
tn, err := parser.ParseQualifiedTableName(fqTableName)
1433-
if err != nil {
1434-
return tableset.Filter{}, errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName)
1435-
}
1436-
filter.IncludeTables[tn.Object()] = struct{}{}
1437-
}
1438-
case tree.ExcludeFilter:
1439-
filter.ExcludeTables = make(map[string]struct{})
1440-
for fqTableName := range spec.FilterList.Tables {
1441-
// TODO(#156859): use fully qualified names once watcher supports it.
1442-
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
1443-
tn, err := parser.ParseQualifiedTableName(fqTableName)
1444-
if err != nil {
1445-
return tableset.Filter{}, errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName)
1446-
}
1447-
filter.ExcludeTables[tn.Object()] = struct{}{}
1448-
}
1449-
}
1450-
}
1451-
return filter, nil
1452-
}
1453-
14541416
var errDoneWatching = errors.New("done watching")
14551417

14561418
func makeChangefeedDescription(
@@ -1856,12 +1818,15 @@ func (b *changefeedResumer) resumeWithRetries(
18561818
waitForTables := isDBLevelChangefeed(details) && targets.NumUniqueTables() == 0
18571819
if waitForTables {
18581820
// Create a watcher for the database.
1859-
filter, err := buildDatabaseWatcherFilterFromSpec(details.TargetSpecifications[0])
1821+
filter := tableset.Filter{
1822+
DatabaseID: details.TargetSpecifications[0].DescID,
1823+
TableFilter: *details.TargetSpecifications[0].FilterList,
1824+
}
1825+
1826+
watcher, err = tableset.NewWatcher(ctx, filter, execCfg, watcherMemMonitor, int64(jobID))
18601827
if err != nil {
18611828
return err
18621829
}
1863-
1864-
watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID))
18651830
g.GoCtx(func(ctx context.Context) error {
18661831
// This method runs the watcher until its context is canceled.
18671832
// The watcher context is canceled when diffs are sent to the
@@ -1890,27 +1855,20 @@ func (b *changefeedResumer) resumeWithRetries(
18901855
if err != nil {
18911856
return err
18921857
}
1893-
watchLoop:
18941858
for ts := schemaTS; ctx.Err() == nil; ts = ts.AddDuration(*frequency) {
1895-
unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, ts)
1859+
added, err := watcher.PopUpTo(ctx, ts)
18961860
if err != nil {
18971861
return err
18981862
}
1899-
if !unchanged {
1900-
// todo(#156874): once watcher gives us only adds,
1901-
// we can safely take the first diff.
1902-
for _, diff := range diffs {
1903-
if diff.Added.ID != 0 {
1904-
schemaTS = diff.AsOf
1905-
initialHighWater = diff.AsOf
1906-
targets, err = AllTargets(ctx, details, execCfg, schemaTS)
1907-
if err != nil {
1908-
return err
1909-
}
1910-
cancelWatcher(errDoneWatching)
1911-
break watchLoop
1912-
}
1863+
if len(added) > 0 {
1864+
schemaTS = added[0].AsOf
1865+
initialHighWater = added[0].AsOf
1866+
targets, err = AllTargets(ctx, details, execCfg, schemaTS)
1867+
if err != nil {
1868+
return err
19131869
}
1870+
cancelWatcher(errDoneWatching)
1871+
break
19141872
}
19151873
}
19161874
}

0 commit comments

Comments
 (0)