From 39711f3f613ecf53c29e821b455661d14c2fa648 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 5 Jan 2026 09:13:22 +0000 Subject: [PATCH 1/7] init --- api/v2/changefeed.go | 23 +++++++++-- cmd/cdc/cli/cli_changefeed_create.go | 2 +- cmd/cdc/cli/cli_changefeed_update.go | 60 +++++++++++++++++++++++++++- 3 files changed, 80 insertions(+), 5 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index d344726663..b293005725 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -221,7 +221,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { return } - ineligibleTables, _, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol) + ineligibleTables, eligibleTables, err := getVerifiedTables(ctx, replicaCfg, kvStorage, cfg.StartTs, scheme, topic, protocol) if err != nil { _ = c.Error(err) return @@ -284,7 +284,10 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ChangefeedID.Name()), zap.String("state", string(info.State)), - zap.String("changefeedInfo", info.String())) + zap.String("changefeedInfo", info.String()), + zap.Int("eligibleTablesLength", len(eligibleTables)), + zap.Int("ineligibleTablesLength", len(ineligibleTables)), + ) c.JSON(getStatus(c), CfInfoToAPIModel( info, @@ -812,6 +815,10 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } + var ( + ineligibleTables []string + eligibleTables []string + ) if configUpdated || sinkURIUpdated { // verify replicaConfig sinkURIParsed, err := url.Parse(oldCfInfo.SinkURI) @@ -845,7 +852,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { } // use checkpointTs get snapshot from kv storage - ineligibleTables, _, err := getVerifiedTables(ctx, oldCfInfo.Config, kvStorage, status.CheckpointTs, scheme, topic, protocol) + ineligibleTables, eligibleTables, err = getVerifiedTables(ctx, oldCfInfo.Config, kvStorage, status.CheckpointTs, scheme, topic, protocol) if err != nil { _ = c.Error(errors.ErrChangefeedUpdateRefused.GenWithStackByCause(err)) return @@ -870,6 +877,16 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } + log.Info("Update changefeed successfully!", + zap.String("id", oldCfInfo.ChangefeedID.Name()), + zap.String("state", string(oldCfInfo.State)), + zap.String("changefeedInfo", oldCfInfo.String()), + zap.Bool("configUpdated", configUpdated), + zap.Bool("sinkURIUpdated", sinkURIUpdated), + zap.Int("eligibleTablesLength", len(eligibleTables)), + zap.Int("ineligibleTablesLength", len(ineligibleTables)), + ) + c.JSON(getStatus(c), CfInfoToAPIModel(oldCfInfo, status, nil)) } diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index eac6d0e2fe..9c3625ddfb 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -337,7 +337,7 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e if err != nil { return err } - cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\n", info.ID, infoStr) + cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", info.ID, infoStr, tables.IneligibleTables, tables.EligibleTables) return nil } diff --git a/cmd/cdc/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go index a704e70f61..79368d2303 100644 --- a/cmd/cdc/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -15,6 +15,7 @@ package cli import ( "encoding/json" + "fmt" "strings" "github.com/pingcap/log" @@ -22,6 +23,7 @@ import ( "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" + "github.com/pingcap/ticdc/pkg/filter" putil "github.com/pingcap/ticdc/pkg/util" "github.com/r3labs/diff" "github.com/spf13/cobra" @@ -128,6 +130,61 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { } changefeedConfig := o.getChangefeedConfig(cmd, newInfo) + + verifyTableConfig := &v2.VerifyTableConfig{ + PDConfig: v2.PDConfig{ + PDAddrs: changefeedConfig.PDAddrs, + CAPath: changefeedConfig.CAPath, + CertPath: changefeedConfig.CertPath, + KeyPath: changefeedConfig.KeyPath, + CertAllowedCN: changefeedConfig.CertAllowedCN, + }, + ReplicaConfig: changefeedConfig.ReplicaConfig, + StartTs: changefeedConfig.StartTs, + SinkURI: changefeedConfig.SinkURI, + } + + tables, err := o.apiV2Client.Changefeeds().VerifyTable(ctx, verifyTableConfig, o.keyspace) + if err != nil { + if strings.Contains(err.Error(), "ErrInvalidIgnoreEventType") { + supportedEventTypes := filter.SupportedEventTypes() + eventTypesStr := make([]string, 0, len(supportedEventTypes)) + for _, eventType := range supportedEventTypes { + eventTypesStr = append(eventTypesStr, string(eventType)) + } + cmd.Println(fmt.Sprintf("Invalid input, 'ignore-event' parameters can only accept [%s]", + strings.Join(eventTypesStr, ", "))) + } + return err + } + + ignoreIneligibleTables := false + if len(tables.IneligibleTables) != 0 { + if putil.GetOrZero(newInfo.Config.ForceReplicate) { + cmd.Printf("[WARN] Force to replicate some ineligible tables, "+ + "these tables do not have a primary key or a not-null unique key: %#v\n"+ + "[WARN] This may cause data redundancy, "+ + "please refer to the official documentation for details.\n", + tables.IneligibleTables) + } else { + cmd.Printf("[WARN] Some tables are not eligible to replicate, "+ + "because they do not have a primary key or a not-null unique key: %#v\n", + tables.IneligibleTables) + if !o.commonChangefeedOptions.noConfirm { + ignoreIneligibleTables, err = confirmIgnoreIneligibleTables(cmd) + if err != nil { + return err + } + } + } + } + + if o.commonChangefeedOptions.noConfirm { + ignoreIneligibleTables = true + } + + changefeedConfig.ReplicaConfig.IgnoreIneligibleTable = putil.AddressOf(ignoreIneligibleTables) + info, err := o.apiV2Client.Changefeeds().Update(ctx, changefeedConfig, o.keyspace, o.changefeedID) if err != nil { return err @@ -136,8 +193,9 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { if err != nil { return err } + cmd.Printf("Update changefeed config successfully! "+ - "\nID: %s\nInfo: %s\n", o.changefeedID, infoStr) + "\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", o.changefeedID, infoStr, tables.IneligibleTables, tables.EligibleTables) return nil } From a79c49ad4a203950497c8d28d231a96c3eab621f Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 12 Jan 2026 08:50:48 +0000 Subject: [PATCH 2/7] add verbose --- cmd/cdc/cli/cli_changefeed_create.go | 6 ++++++ cmd/cdc/cli/cli_changefeed_update.go | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index 9c3625ddfb..43515f9afa 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -106,6 +106,7 @@ type createChangefeedOptions struct { disableGCSafePointCheck bool startTs uint64 timezone string + verbose bool cfg *config.ReplicaConfig } @@ -126,6 +127,7 @@ func (o *createChangefeedOptions) addFlags(cmd *cobra.Command) { cmd.PersistentFlags().BoolVarP(&o.disableGCSafePointCheck, "disable-gc-check", "", false, "Disable GC safe point check") cmd.PersistentFlags().Uint64Var(&o.startTs, "start-ts", 0, "Start ts of changefeed") cmd.PersistentFlags().StringVar(&o.timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)") + cmd.PersistentFlags().BoolVar(&o.verbose, "verbose", false, "Print verbose information during creating changefeed") // we don't support specify these flags below when cdc version >= 6.2.0 _ = cmd.PersistentFlags().MarkHidden("tz") } @@ -338,6 +340,10 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e return err } cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", info.ID, infoStr, tables.IneligibleTables, tables.EligibleTables) + if o.verbose { + cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) + cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) + } return nil } diff --git a/cmd/cdc/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go index 79368d2303..f7f27fe33f 100644 --- a/cmd/cdc/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -38,6 +38,7 @@ type updateChangefeedOptions struct { commonChangefeedOptions *changefeedCommonOptions changefeedID string keyspace string + verbose bool } // newUpdateChangefeedOptions creates new options for the `cli changefeed update` command. @@ -53,6 +54,7 @@ func (o *updateChangefeedOptions) addFlags(cmd *cobra.Command) { o.commonChangefeedOptions.addFlags(cmd) cmd.PersistentFlags().StringVarP(&o.keyspace, "keyspace", "k", "default", "Replication task (changefeed) Keyspace") cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID") + cmd.PersistentFlags().BoolVar(&o.verbose, "verbose", false, "Print verbose information during updating changefeed") _ = cmd.MarkPersistentFlagRequired("changefeed-id") } @@ -197,6 +199,10 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { cmd.Printf("Update changefeed config successfully! "+ "\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", o.changefeedID, infoStr, tables.IneligibleTables, tables.EligibleTables) + if o.verbose { + cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) + cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) + } return nil } From 3bd3af65442a73f1a5839ecad453b6ad3c320621 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 12 Jan 2026 09:34:11 +0000 Subject: [PATCH 3/7] move table count metric to summary --- metrics/grafana/ticdc_new_arch.json | 336 +++++++++--------- metrics/grafana/ticdc_new_arch_next_gen.json | 336 +++++++++--------- .../ticdc_new_arch_with_keyspace_name.json | 206 +++++------ 3 files changed, 439 insertions(+), 439 deletions(-) diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 7c7f6d3533..e0e9c3840e 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -1088,6 +1088,109 @@ "yaxis": { "align": false } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The total number of tables", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 26 + }, + "hiddenSeries": false, + "id": 22271, + "legend": { + "alignAsTable": false, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": false + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", namespace=~\"$namespace\",changefeed=~\"$changefeed\"}) by (namespace,changefeed,mode)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{namespace}}-{{changefeed}}-{{mode}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Table Count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Summary", @@ -3394,70 +3497,70 @@ "show": false } ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "datasource": "${DS_TEST-CLUSTER}", - "description": "Build metadata of each TiCDC server instance.", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 31 - }, - "id": 22473, - "options": { - "showHeader": true, - "sortBy": [] - }, - "pluginVersion": "7.5.17", - "transformations": [ - { - "id": "labelsToFields", - "options": {} - }, - { - "id": "organize", - "options": { - "excludeByName": { - "Metric": true, - "Time": true, - "Value": true, - "__name__": true - }, - "indexByName": { - "instance": 0, - "kernel_type": 1, - "git_hash": 2, - "release_version": 3, - "utc_build_time": 4 - }, - "renameByName": {} - } - } - ], - "targets": [ - { - "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", - "format": "time_series", - "instant": true, - "refId": "A" - } - ], - "title": "Build Info", - "type": "table" - } - ], - "title": "Server", - "type": "row" - }, + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Build metadata of each TiCDC server instance.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 22473, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "instance": 0, + "kernel_type": 1, + "git_hash": 2, + "release_version": 3, + "utc_build_time": 4 + }, + "renameByName": {} + } + } + ], + "targets": [ + { + "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Build Info", + "type": "table" + } + ], + "title": "Server", + "type": "row" + }, { "collapsed": true, "datasource": null, @@ -16032,109 +16135,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "The total number of tables", - "fieldConfig": { - "defaults": { - "links": [] - }, - "overrides": [] - }, - "fill": 0, - "fillGradient": 0, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 105 - }, - "hiddenSeries": false, - "id": 22271, - "legend": { - "alignAsTable": false, - "avg": false, - "current": true, - "max": true, - "min": false, - "rightSide": false, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": false - }, - "paceLength": 10, - "percentage": false, - "pluginVersion": "7.5.17", - "pointradius": 2, - "points": true, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", namespace=~\"$namespace\",changefeed=~\"$changefeed\"}) by (namespace,changefeed,mode)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{namespace}}-{{changefeed}}-{{mode}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Table Count", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -23961,4 +23961,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 36 -} +} \ No newline at end of file diff --git a/metrics/grafana/ticdc_new_arch_next_gen.json b/metrics/grafana/ticdc_new_arch_next_gen.json index 46d38be2ab..de6a4a9509 100644 --- a/metrics/grafana/ticdc_new_arch_next_gen.json +++ b/metrics/grafana/ticdc_new_arch_next_gen.json @@ -1088,6 +1088,109 @@ "yaxis": { "align": false } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The total number of tables", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 26 + }, + "hiddenSeries": false, + "id": 22271, + "legend": { + "alignAsTable": false, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": false + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", instance=~\"$ticdc_instance\", keyspace_name=~\"$keyspace_name\",changefeed=~\"$changefeed\"}) by (keyspace_name,changefeed,mode)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{mode}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Table Count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Summary", @@ -3394,70 +3497,70 @@ "show": false } ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "datasource": "${DS_TEST-CLUSTER}", - "description": "Build metadata of each TiCDC server instance.", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 31 - }, - "id": 22473, - "options": { - "showHeader": true, - "sortBy": [] - }, - "pluginVersion": "7.5.17", - "transformations": [ - { - "id": "labelsToFields", - "options": {} - }, - { - "id": "organize", - "options": { - "excludeByName": { - "Metric": true, - "Time": true, - "Value": true, - "__name__": true - }, - "indexByName": { - "instance": 0, - "kernel_type": 1, - "git_hash": 2, - "release_version": 3, - "utc_build_time": 4 - }, - "renameByName": {} - } - } - ], - "targets": [ - { - "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", - "format": "time_series", - "instant": true, - "refId": "A" - } - ], - "title": "Build Info", - "type": "table" - } - ], - "title": "Server", - "type": "row" - }, + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "Build metadata of each TiCDC server instance.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 22473, + "options": { + "showHeader": true, + "sortBy": [] + }, + "pluginVersion": "7.5.17", + "transformations": [ + { + "id": "labelsToFields", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "Value": true, + "__name__": true + }, + "indexByName": { + "instance": 0, + "kernel_type": 1, + "git_hash": 2, + "release_version": 3, + "utc_build_time": 4 + }, + "renameByName": {} + } + } + ], + "targets": [ + { + "expr": "max by (instance, kernel_type, git_hash, release_version, utc_build_time) (ticdc_server_build_info{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", job=~\".*ticdc.*\", instance=~\"$ticdc_instance\"})", + "format": "time_series", + "instant": true, + "refId": "A" + } + ], + "title": "Build Info", + "type": "table" + } + ], + "title": "Server", + "type": "row" + }, { "collapsed": true, "datasource": null, @@ -16032,109 +16135,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "The total number of tables", - "fieldConfig": { - "defaults": { - "links": [] - }, - "overrides": [] - }, - "fill": 0, - "fillGradient": 0, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 105 - }, - "hiddenSeries": false, - "id": 22271, - "legend": { - "alignAsTable": false, - "avg": false, - "current": true, - "max": true, - "min": false, - "rightSide": false, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": false - }, - "paceLength": 10, - "percentage": false, - "pluginVersion": "7.5.17", - "pointradius": 2, - "points": true, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", instance=~\"$ticdc_instance\", keyspace_name=~\"$keyspace_name\",changefeed=~\"$changefeed\"}) by (keyspace_name,changefeed,mode)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{mode}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Table Count", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -23961,4 +23961,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 36 -} +} \ No newline at end of file diff --git a/metrics/grafana/ticdc_new_arch_with_keyspace_name.json b/metrics/grafana/ticdc_new_arch_with_keyspace_name.json index 2c7143cdae..d1359962ff 100644 --- a/metrics/grafana/ticdc_new_arch_with_keyspace_name.json +++ b/metrics/grafana/ticdc_new_arch_with_keyspace_name.json @@ -899,6 +899,109 @@ "yaxis": { "align": false } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The total number of tables", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 26 + }, + "hiddenSeries": false, + "id": 22271, + "legend": { + "alignAsTable": false, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": false + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", keyspace_name=~\"$keyspace_name\",changefeed=~\"$changefeed\"}) by (keyspace_name,changefeed,mode)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{mode}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Table Count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Summary", @@ -7326,109 +7429,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "The total number of tables", - "fieldConfig": { - "defaults": { - "links": [] - }, - "overrides": [] - }, - "fill": 0, - "fillGradient": 0, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 105 - }, - "hiddenSeries": false, - "id": 22271, - "legend": { - "alignAsTable": false, - "avg": false, - "current": true, - "max": true, - "min": false, - "rightSide": false, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": false - }, - "paceLength": 10, - "percentage": false, - "pluginVersion": "7.5.17", - "pointradius": 2, - "points": true, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "sum(ticdc_scheduler_table_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", keyspace_name=~\"$keyspace_name\",changefeed=~\"$changefeed\"}) by (keyspace_name,changefeed,mode)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{mode}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Table Count", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, From 031a56a11daf22c56e77cf950daf23049e5fb613 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 13 Jan 2026 06:18:18 +0000 Subject: [PATCH 4/7] update --- api/v2/changefeed.go | 35 ++++++++++++++++ cmd/cdc/cli/cli_changefeed_create.go | 2 +- cmd/cdc/cli/cli_changefeed_update.go | 63 +--------------------------- 3 files changed, 38 insertions(+), 62 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index b293005725..4dea51b692 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -712,12 +712,47 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { } }() + sinkURIParsed, err := url.Parse(cfInfo.SinkURI) + if err != nil { + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) + return + } + scheme := sinkURIParsed.Scheme + topic := "" + if config.IsMQScheme(scheme) { + topic, err = helper.GetTopic(sinkURIParsed) + if err != nil { + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) + return + } + } + protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(cfInfo.Config.Sink.Protocol)) + + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName) + if err != nil { + _ = c.Error(err) + return + } + ineligibleTables, eligibleTables, err := getVerifiedTables(ctx, cfInfo.Config, kvStorage, newCheckpointTs, scheme, topic, protocol) + if err != nil { + _ = c.Error(err) + return + } + err = co.ResumeChangefeed(ctx, cfInfo.ChangefeedID, newCheckpointTs, cfg.OverwriteCheckpointTs != 0) if err != nil { _ = c.Error(err) return } c.Errors = nil + log.Info("Resume changefeed successfully!", + zap.String("id", cfInfo.ChangefeedID.Name()), + zap.String("state", string(cfInfo.State)), + zap.String("changefeedInfo", cfInfo.String()), + zap.Int("eligibleTablesLength", len(eligibleTables)), + zap.Int("ineligibleTablesLength", len(ineligibleTables)), + ) c.JSON(getStatus(c), &EmptyResponse{}) } diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index 43515f9afa..b5b8c4648c 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -339,7 +339,7 @@ func (o *createChangefeedOptions) run(ctx context.Context, cmd *cobra.Command) e if err != nil { return err } - cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", info.ID, infoStr, tables.IneligibleTables, tables.EligibleTables) + cmd.Printf("Create changefeed successfully!\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", info.ID, infoStr, len(tables.IneligibleTables), len(tables.EligibleTables)) if o.verbose { cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) diff --git a/cmd/cdc/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go index f7f27fe33f..5adfd02122 100644 --- a/cmd/cdc/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -15,7 +15,6 @@ package cli import ( "encoding/json" - "fmt" "strings" "github.com/pingcap/log" @@ -23,7 +22,6 @@ import ( "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - "github.com/pingcap/ticdc/pkg/filter" putil "github.com/pingcap/ticdc/pkg/util" "github.com/r3labs/diff" "github.com/spf13/cobra" @@ -106,6 +104,7 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { return err } // sink uri is not changed, set old to empty to skip diff + // old sink uri may contain sensitive information, the password part is masked if newInfo.SinkURI == "" { old.SinkURI = "" } @@ -133,60 +132,6 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { changefeedConfig := o.getChangefeedConfig(cmd, newInfo) - verifyTableConfig := &v2.VerifyTableConfig{ - PDConfig: v2.PDConfig{ - PDAddrs: changefeedConfig.PDAddrs, - CAPath: changefeedConfig.CAPath, - CertPath: changefeedConfig.CertPath, - KeyPath: changefeedConfig.KeyPath, - CertAllowedCN: changefeedConfig.CertAllowedCN, - }, - ReplicaConfig: changefeedConfig.ReplicaConfig, - StartTs: changefeedConfig.StartTs, - SinkURI: changefeedConfig.SinkURI, - } - - tables, err := o.apiV2Client.Changefeeds().VerifyTable(ctx, verifyTableConfig, o.keyspace) - if err != nil { - if strings.Contains(err.Error(), "ErrInvalidIgnoreEventType") { - supportedEventTypes := filter.SupportedEventTypes() - eventTypesStr := make([]string, 0, len(supportedEventTypes)) - for _, eventType := range supportedEventTypes { - eventTypesStr = append(eventTypesStr, string(eventType)) - } - cmd.Println(fmt.Sprintf("Invalid input, 'ignore-event' parameters can only accept [%s]", - strings.Join(eventTypesStr, ", "))) - } - return err - } - - ignoreIneligibleTables := false - if len(tables.IneligibleTables) != 0 { - if putil.GetOrZero(newInfo.Config.ForceReplicate) { - cmd.Printf("[WARN] Force to replicate some ineligible tables, "+ - "these tables do not have a primary key or a not-null unique key: %#v\n"+ - "[WARN] This may cause data redundancy, "+ - "please refer to the official documentation for details.\n", - tables.IneligibleTables) - } else { - cmd.Printf("[WARN] Some tables are not eligible to replicate, "+ - "because they do not have a primary key or a not-null unique key: %#v\n", - tables.IneligibleTables) - if !o.commonChangefeedOptions.noConfirm { - ignoreIneligibleTables, err = confirmIgnoreIneligibleTables(cmd) - if err != nil { - return err - } - } - } - } - - if o.commonChangefeedOptions.noConfirm { - ignoreIneligibleTables = true - } - - changefeedConfig.ReplicaConfig.IgnoreIneligibleTable = putil.AddressOf(ignoreIneligibleTables) - info, err := o.apiV2Client.Changefeeds().Update(ctx, changefeedConfig, o.keyspace, o.changefeedID) if err != nil { return err @@ -197,12 +142,8 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { } cmd.Printf("Update changefeed config successfully! "+ - "\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", o.changefeedID, infoStr, tables.IneligibleTables, tables.EligibleTables) + "\nID: %s\nInfo: %s", o.changefeedID, infoStr) - if o.verbose { - cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) - cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) - } return nil } From e957e9ed52267ce4ebe78ef2f4c4b3dda9b7eb19 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 13 Jan 2026 10:47:10 +0000 Subject: [PATCH 5/7] update --- api/v2/api.go | 2 + api/v2/changefeed.go | 111 +++++++++++++++++++-------- cmd/cdc/cli/cli_changefeed_resume.go | 45 ++++++++++- cmd/cdc/cli/cli_changefeed_update.go | 42 +++++++++- pkg/api/v2/changefeed.go | 16 ++++ pkg/api/v2/mock/changefeed_mock.go | 9 +++ 6 files changed, 188 insertions(+), 37 deletions(-) diff --git a/api/v2/api.go b/api/v2/api.go index 1ee5bcb061..d48345ee5e 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -86,6 +86,8 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { verifyTableGroup := v2.Group("/verify_table") verifyTableGroup.POST("", api.VerifyTable) + getAllTablesGroup := v2.Group("/get_all_tables") + getAllTablesGroup.POST("", api.GetAllTables) // processor apis // Note: They are not useful in new arch cdc, diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 4dea51b692..5cdea38321 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -355,7 +355,44 @@ func (h *OpenAPIV2) ListChangeFeeds(c *gin.Context) { c.JSON(http.StatusOK, toListResponse(c, commonInfos)) } -// VerifyTable verify table, return ineligibleTables and EligibleTables. +// GetAllTables return ineligibleTables and EligibleTables. +func (h *OpenAPIV2) GetAllTables(c *gin.Context) { + ctx := c.Request.Context() + cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()} + + if err := c.BindJSON(&cfg); err != nil { + _ = c.Error(errors.WrapError(errors.ErrAPIInvalidParam, err)) + return + } + + // fill replicaConfig + replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig() + + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + keyspaceName := GetKeyspaceValueWithDefault(c) + kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName) + if err != nil { + _ = c.Error(err) + return + } + + f, err := filter.NewFilter(replicaCfg.Filter, "", util.GetOrZero(replicaCfg.CaseSensitive), util.GetOrZero(replicaCfg.ForceReplicate)) + if err != nil { + _ = c.Error(err) + return + } + _, ineligibleTables, eligibleTables, err := schemastore. + VerifyTables(f, kvStorage, cfg.StartTs) + if err != nil { + return + } + tables := &Tables{ + IneligibleTables: toAPIModelFunc(ineligibleTables), + EligibleTables: toAPIModelFunc(eligibleTables), + } + c.JSON(http.StatusOK, tables) +} + func (h *OpenAPIV2) VerifyTable(c *gin.Context) { ctx := c.Request.Context() cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()} @@ -408,15 +445,6 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) { zap.Bool("ignoreIneligibleTable", util.GetOrZero(cfg.ReplicaConfig.IgnoreIneligibleTable)), ) - toAPIModelFunc := func(tbls []string) []TableName { - var apiModels []TableName - for _, tbl := range tbls { - apiModels = append(apiModels, TableName{ - Table: tbl, - }) - } - return apiModels - } tables := &Tables{ IneligibleTables: toAPIModelFunc(ineligibleTables), EligibleTables: toAPIModelFunc(eligibleTables), @@ -469,6 +497,16 @@ func shouldShowRunningError(state config.FeedState) bool { } } +func toAPIModelFunc(tbls []string) []TableName { + var apiModels []TableName + for _, tbl := range tbls { + apiModels = append(apiModels, TableName{ + Table: tbl, + }) + } + return apiModels +} + func CfInfoToAPIModel( info *config.ChangeFeedInfo, status *config.ChangeFeedStatus, @@ -669,8 +707,10 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. newCheckpointTs := status.CheckpointTs + overwriteCheckpointTs := false if cfg.OverwriteCheckpointTs != 0 { newCheckpointTs = cfg.OverwriteCheckpointTs + overwriteCheckpointTs = true } keyspaceMeta := middleware.GetKeyspaceFromContext(c) @@ -712,35 +752,41 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { } }() - sinkURIParsed, err := url.Parse(cfInfo.SinkURI) - if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) - return - } - scheme := sinkURIParsed.Scheme - topic := "" - if config.IsMQScheme(scheme) { - topic, err = helper.GetTopic(sinkURIParsed) + var ( + eligibleTables []string + ineligibleTables []string + ) + if overwriteCheckpointTs { + sinkURIParsed, err := url.Parse(cfInfo.SinkURI) if err != nil { _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) return } - } - protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(cfInfo.Config.Sink.Protocol)) + scheme := sinkURIParsed.Scheme + topic := "" + if config.IsMQScheme(scheme) { + topic, err = helper.GetTopic(sinkURIParsed) + if err != nil { + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) + return + } + } + protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(cfInfo.Config.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) - kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName) - if err != nil { - _ = c.Error(err) - return - } - ineligibleTables, eligibleTables, err := getVerifiedTables(ctx, cfInfo.Config, kvStorage, newCheckpointTs, scheme, topic, protocol) - if err != nil { - _ = c.Error(err) - return + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + kvStorage, err := keyspaceManager.GetStorage(ctx, keyspaceName) + if err != nil { + _ = c.Error(err) + return + } + ineligibleTables, eligibleTables, err = getVerifiedTables(ctx, cfInfo.Config, kvStorage, newCheckpointTs, scheme, topic, protocol) + if err != nil { + _ = c.Error(err) + return + } } - err = co.ResumeChangefeed(ctx, cfInfo.ChangefeedID, newCheckpointTs, cfg.OverwriteCheckpointTs != 0) + err = co.ResumeChangefeed(ctx, cfInfo.ChangefeedID, newCheckpointTs, overwriteCheckpointTs) if err != nil { _ = c.Error(err) return @@ -750,6 +796,7 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { zap.String("id", cfInfo.ChangefeedID.Name()), zap.String("state", string(cfInfo.State)), zap.String("changefeedInfo", cfInfo.String()), + zap.Bool("overwriteCheckpointTs", overwriteCheckpointTs), zap.Int("eligibleTablesLength", len(eligibleTables)), zap.Int("ineligibleTablesLength", len(ineligibleTables)), ) diff --git a/cmd/cdc/cli/cli_changefeed_resume.go b/cmd/cdc/cli/cli_changefeed_resume.go index 097dc36e3d..922e4202d1 100644 --- a/cmd/cdc/cli/cli_changefeed_resume.go +++ b/cmd/cdc/cli/cli_changefeed_resume.go @@ -21,15 +21,16 @@ import ( v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/pingcap/ticdc/cmd/util" - apiv2client "github.com/pingcap/ticdc/pkg/api/v2" + apiClient "github.com/pingcap/ticdc/pkg/api/v2" cerror "github.com/pingcap/ticdc/pkg/errors" + putil "github.com/pingcap/ticdc/pkg/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" ) // resumeChangefeedOptions defines flags for the `cli changefeed resume` command. type resumeChangefeedOptions struct { - apiClient apiv2client.APIV2Interface + apiClient apiClient.APIV2Interface changefeedID string keyspace string @@ -38,6 +39,7 @@ type resumeChangefeedOptions struct { overwriteCheckpointTs string currentTso *v2.Tso checkpointTs uint64 + verbose bool upstreamPDAddrs string upstreamCaPath string @@ -66,6 +68,7 @@ func (o *resumeChangefeedOptions) addFlags(cmd *cobra.Command) { "Certificate path for TLS connection to upstream") cmd.PersistentFlags().StringVar(&o.upstreamKeyPath, "upstream-key", "", "Private key path for TLS connection to upstream") + cmd.PersistentFlags().BoolVar(&o.verbose, "verbose", false, "Print verbose information during updating changefeed") // we don't support specify there flags below when cdc version <= 6.3.0 _ = cmd.PersistentFlags().MarkHidden("upstream-pd") _ = cmd.PersistentFlags().MarkHidden("upstream-ca") @@ -204,8 +207,44 @@ func (o *resumeChangefeedOptions) run(cmd *cobra.Command) error { if err := o.confirmResumeChangefeedCheck(cmd); err != nil { return err } + tables := &v2.Tables{} + if o.checkpointTs != 0 { + cf, err := o.apiClient.Changefeeds().Get(ctx, o.keyspace, o.changefeedID) + if err != nil { + return err + } + tables, err := o.apiClient.Changefeeds().GetAllTables(ctx, &v2.VerifyTableConfig{ + ReplicaConfig: cf.Config, + StartTs: cf.CheckpointTs, + }, o.keyspace) + if err != nil { + return err + } + if len(tables.IneligibleTables) != 0 { + if putil.GetOrZero(cf.Config.ForceReplicate) { + cmd.Printf("[WARN] Force to replicate some ineligible tables, "+ + "these tables do not have a primary key or a not-null unique key: %#v\n"+ + "[WARN] This may cause data redundancy, "+ + "please refer to the official documentation for details.\n", + tables.IneligibleTables) + } else { + cmd.Printf("[WARN] Some tables are not eligible to replicate, "+ + "because they do not have a primary key or a not-null unique key: %#v\n", + tables.IneligibleTables) + } + } + } err := o.apiClient.Changefeeds().Resume(ctx, cfg, o.keyspace, o.changefeedID) - + if err != nil { + return err + } + // o.checkpointTs != 0 + cmd.Printf("Resume changefeed successfully! "+ + "\nID: %s\nOverwriteCheckpointTs: %t\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", o.changefeedID, o.checkpointTs != 0, len(tables.IneligibleTables), len(tables.EligibleTables)) + if o.verbose { + cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) + cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) + } return err } diff --git a/cmd/cdc/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go index 5adfd02122..7cffc1e2af 100644 --- a/cmd/cdc/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -132,6 +132,41 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { changefeedConfig := o.getChangefeedConfig(cmd, newInfo) + tables, err := o.apiV2Client.Changefeeds().GetAllTables(ctx, &v2.VerifyTableConfig{ + ReplicaConfig: changefeedConfig.ReplicaConfig, + StartTs: newInfo.CheckpointTs, + }, o.keyspace) + if err != nil { + return err + } + + ignoreIneligibleTables := false + if len(tables.IneligibleTables) != 0 { + if putil.GetOrZero(newInfo.Config.ForceReplicate) { + cmd.Printf("[WARN] Force to replicate some ineligible tables, "+ + "these tables do not have a primary key or a not-null unique key: %#v\n"+ + "[WARN] This may cause data redundancy, "+ + "please refer to the official documentation for details.\n", + tables.IneligibleTables) + } else { + cmd.Printf("[WARN] Some tables are not eligible to replicate, "+ + "because they do not have a primary key or a not-null unique key: %#v\n", + tables.IneligibleTables) + if !o.commonChangefeedOptions.noConfirm { + ignoreIneligibleTables, err = confirmIgnoreIneligibleTables(cmd) + if err != nil { + return err + } + } + } + } + + if o.commonChangefeedOptions.noConfirm { + ignoreIneligibleTables = true + } + + changefeedConfig.ReplicaConfig.IgnoreIneligibleTable = putil.AddressOf(ignoreIneligibleTables) + info, err := o.apiV2Client.Changefeeds().Update(ctx, changefeedConfig, o.keyspace, o.changefeedID) if err != nil { return err @@ -142,8 +177,11 @@ func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { } cmd.Printf("Update changefeed config successfully! "+ - "\nID: %s\nInfo: %s", o.changefeedID, infoStr) - + "\nID: %s\nInfo: %s\nIneligibleTablesCount: %d\nEligibleTablesCount: %d\n", info.ID, infoStr, len(tables.IneligibleTables), len(tables.EligibleTables)) + if o.verbose { + cmd.Printf("EligibleTables: %v\n", tables.EligibleTables) + cmd.Printf("IneligibleTablesCount: %v\n", tables.IneligibleTables) + } return nil } diff --git a/pkg/api/v2/changefeed.go b/pkg/api/v2/changefeed.go index 3fa7dff70f..9637ae5279 100644 --- a/pkg/api/v2/changefeed.go +++ b/pkg/api/v2/changefeed.go @@ -36,6 +36,8 @@ type ChangefeedInterface interface { Create(ctx context.Context, cfg *v2.ChangefeedConfig, keyspace string) (*v2.ChangeFeedInfo, error) // VerifyTable verifies table for a changefeed VerifyTable(ctx context.Context, cfg *v2.VerifyTableConfig, keyspace string) (*v2.Tables, error) + // GetAllTables returns eligible and ineligible tables for a changefeed + GetAllTables(ctx context.Context, cfg *v2.VerifyTableConfig, keyspace string) (*v2.Tables, error) // Update updates a changefeed Update(ctx context.Context, cfg *v2.ChangefeedConfig, keyspace string, name string) (*v2.ChangeFeedInfo, error) @@ -98,6 +100,20 @@ func (c *changefeeds) VerifyTable(ctx context.Context, return result, err } +func (c *changefeeds) GetAllTables(ctx context.Context, + cfg *v2.VerifyTableConfig, + keyspace string, +) (*v2.Tables, error) { + result := &v2.Tables{} + u := fmt.Sprintf("get_all_tables?%s=%s", api.APIOpVarKeyspace, keyspace) + err := c.client.Post(). + WithURI(u). + WithBody(cfg). + Do(ctx). + Into(result) + return result, err +} + func (c *changefeeds) Update(ctx context.Context, cfg *v2.ChangefeedConfig, keyspace string, name string, ) (*v2.ChangeFeedInfo, error) { diff --git a/pkg/api/v2/mock/changefeed_mock.go b/pkg/api/v2/mock/changefeed_mock.go index 6e5380c7ca..5039453fc4 100644 --- a/pkg/api/v2/mock/changefeed_mock.go +++ b/pkg/api/v2/mock/changefeed_mock.go @@ -240,6 +240,15 @@ func (m *MockChangefeedInterface) VerifyTable(ctx context.Context, cfg *v2.Verif return ret0, ret1 } +// GetAllTables mocks base method. +func (m *MockChangefeedInterface) GetAllTables(ctx context.Context, cfg *v2.VerifyTableConfig, keyspace string) (*v2.Tables, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllTables", ctx, cfg, keyspace) + ret0, _ := ret[0].(*v2.Tables) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + // VerifyTable indicates an expected call of VerifyTable. func (mr *MockChangefeedInterfaceMockRecorder) VerifyTable(ctx, cfg, keyspace interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() From 66f7f99fe1ecaca8851939ae1a58a78ad21a3ac7 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 15 Jan 2026 03:13:34 +0000 Subject: [PATCH 6/7] update --- pkg/api/v2/mock/changefeed_mock.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/api/v2/mock/changefeed_mock.go b/pkg/api/v2/mock/changefeed_mock.go index 5039453fc4..d132aa138c 100644 --- a/pkg/api/v2/mock/changefeed_mock.go +++ b/pkg/api/v2/mock/changefeed_mock.go @@ -117,6 +117,21 @@ func (mr *MockChangefeedInterfaceMockRecorder) Get(ctx, keyspace, name interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockChangefeedInterface)(nil).Get), ctx, keyspace, name) } +// GetAllTables mocks base method. +func (m *MockChangefeedInterface) GetAllTables(ctx context.Context, cfg *v2.VerifyTableConfig, keyspace string) (*v2.Tables, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllTables", ctx, cfg, keyspace) + ret0, _ := ret[0].(*v2.Tables) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllTables indicates an expected call of GetAllTables. +func (mr *MockChangefeedInterfaceMockRecorder) GetAllTables(ctx, cfg, keyspace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllTables", reflect.TypeOf((*MockChangefeedInterface)(nil).GetAllTables), ctx, cfg, keyspace) +} + // List mocks base method. func (m *MockChangefeedInterface) List(ctx context.Context, keyspace, state string) ([]v2.ChangefeedCommonInfo, error) { m.ctrl.T.Helper() @@ -240,15 +255,6 @@ func (m *MockChangefeedInterface) VerifyTable(ctx context.Context, cfg *v2.Verif return ret0, ret1 } -// GetAllTables mocks base method. -func (m *MockChangefeedInterface) GetAllTables(ctx context.Context, cfg *v2.VerifyTableConfig, keyspace string) (*v2.Tables, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAllTables", ctx, cfg, keyspace) - ret0, _ := ret[0].(*v2.Tables) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - // VerifyTable indicates an expected call of VerifyTable. func (mr *MockChangefeedInterfaceMockRecorder) VerifyTable(ctx, cfg, keyspace interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() From d9ca7042db920a7fb12f30629f57f3ead3eefb2f Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 16 Jan 2026 06:14:27 +0000 Subject: [PATCH 7/7] fix ut --- cmd/cdc/cli/cli_changefeed_update_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/cdc/cli/cli_changefeed_update_test.go b/cmd/cdc/cli/cli_changefeed_update_test.go index 3a1c961065..1b8e439222 100644 --- a/cmd/cdc/cli/cli_changefeed_update_test.go +++ b/cmd/cdc/cli/cli_changefeed_update_test.go @@ -120,6 +120,8 @@ func TestChangefeedUpdateCli(t *testing.T) { Sink: &v2.SinkConfig{}, }, }, nil) + f.changefeeds.EXPECT().GetAllTables(gomock.Any(), gomock.Any(), "ks"). + Return(&v2.Tables{}, nil) f.changefeeds.EXPECT().Update(gomock.Any(), gomock.Any(), "ks", "abc"). Return(&v2.ChangeFeedInfo{}, nil) dir := t.TempDir()