Skip to content

Commit 3b82376

Browse files
zeminzhoupingyuzz-jason
authored
[to #48] modify TiDB CDC to TiKV CDC (#74)
* modify TiCDC to TiKV CDC Signed-off-by: zeminzhou <[email protected]> Co-authored-by: Ping Yu <[email protected]> Co-authored-by: Jian Zhang <[email protected]>
1 parent a5cb1a1 commit 3b82376

File tree

270 files changed

+4212
-38757
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

270 files changed

+4212
-38757
lines changed

cdc/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|cdc\/tests|integration|te
3232
PACKAGES := $$($(PACKAGE_LIST))
3333
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto|pb\.go|pb\.gw\.go')
3434
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils')
35-
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/tikv/migration/cdc/"}|grep -v "github.com/pingcap/migration/cdc/"; done)
35+
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/tikv/migration/cdc/"}|grep -v "github.com/tikv/migration/cdc/"; done)
3636
FAILPOINT := tools/bin/failpoint-ctl
3737

3838
FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)

cdc/cdc/capture/capture.go

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
"github.com/tikv/migration/cdc/cdc/model"
3131
"github.com/tikv/migration/cdc/cdc/owner"
3232
"github.com/tikv/migration/cdc/cdc/processor"
33-
"github.com/tikv/migration/cdc/cdc/processor/pipeline/system"
34-
ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"
33+
34+
// ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"
3535
"github.com/tikv/migration/cdc/pkg/config"
3636
cdcContext "github.com/tikv/migration/cdc/pkg/context"
3737
cerror "github.com/tikv/migration/cdc/pkg/errors"
@@ -66,10 +66,10 @@ type Capture struct {
6666
grpcPool kv.GrpcPool
6767
regionCache *tikv.RegionCache
6868
TimeAcquirer pdtime.TimeAcquirer
69-
sorterSystem *ssystem.System
69+
// sorterSystem *ssystem.System
7070

7171
enableNewScheduler bool
72-
tableActorSystem *system.System
72+
// keyspanActorSystem *system.System
7373

7474
// MessageServer is the receiver of the messages from the other nodes.
7575
// It should be recreated each time the capture is restarted.
@@ -141,40 +141,6 @@ func (c *Capture) reset(ctx context.Context) error {
141141
c.TimeAcquirer.Stop()
142142
}
143143
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)
144-
145-
if c.tableActorSystem != nil {
146-
err := c.tableActorSystem.Stop()
147-
if err != nil {
148-
log.Warn("stop table actor system failed", zap.Error(err))
149-
}
150-
}
151-
if conf.Debug.EnableTableActor {
152-
c.tableActorSystem = system.NewSystem()
153-
err = c.tableActorSystem.Start(ctx)
154-
if err != nil {
155-
return errors.Annotate(
156-
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
157-
"create table actor system")
158-
}
159-
}
160-
if conf.Debug.EnableDBSorter {
161-
if c.sorterSystem != nil {
162-
err := c.sorterSystem.Stop()
163-
if err != nil {
164-
log.Warn("stop sorter system failed", zap.Error(err))
165-
}
166-
}
167-
// Sorter dir has been set and checked when server starts.
168-
// See https://github.com/tikv/migration/cdc/blob/9dad09/cdc/server.go#L275
169-
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
170-
c.sorterSystem = ssystem.NewSystem(sortDir, conf.Debug.DB)
171-
err = c.sorterSystem.Start(ctx)
172-
if err != nil {
173-
return errors.Annotate(
174-
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
175-
"create sorter system")
176-
}
177-
}
178144
if c.grpcPool != nil {
179145
c.grpcPool.Close()
180146
}
@@ -257,17 +223,17 @@ func (c *Capture) Run(ctx context.Context) error {
257223

258224
func (c *Capture) run(stdCtx context.Context) error {
259225
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
260-
PDClient: c.pdClient,
261-
KVStorage: c.kvStorage,
262-
CaptureInfo: c.info,
263-
EtcdClient: c.etcdClient,
264-
GrpcPool: c.grpcPool,
265-
RegionCache: c.regionCache,
266-
TimeAcquirer: c.TimeAcquirer,
267-
TableActorSystem: c.tableActorSystem,
268-
SorterSystem: c.sorterSystem,
269-
MessageServer: c.MessageServer,
270-
MessageRouter: c.MessageRouter,
226+
PDClient: c.pdClient,
227+
KVStorage: c.kvStorage,
228+
CaptureInfo: c.info,
229+
EtcdClient: c.etcdClient,
230+
GrpcPool: c.grpcPool,
231+
RegionCache: c.regionCache,
232+
TimeAcquirer: c.TimeAcquirer,
233+
// KeySpanActorSystem: c.keyspanActorSystem,
234+
// SorterSystem: c.sorterSystem,
235+
MessageServer: c.MessageServer,
236+
MessageRouter: c.MessageRouter,
271237
})
272238
err := c.register(ctx)
273239
if err != nil {
@@ -535,20 +501,6 @@ func (c *Capture) AsyncClose() {
535501
c.regionCache.Close()
536502
c.regionCache = nil
537503
}
538-
if c.tableActorSystem != nil {
539-
err := c.tableActorSystem.Stop()
540-
if err != nil {
541-
log.Warn("stop table actor system failed", zap.Error(err))
542-
}
543-
c.tableActorSystem = nil
544-
}
545-
if c.sorterSystem != nil {
546-
err := c.sorterSystem.Stop()
547-
if err != nil {
548-
log.Warn("stop sorter system failed", zap.Error(err))
549-
}
550-
c.sorterSystem = nil
551-
}
552504
if c.enableNewScheduler {
553505
c.grpcService.Reset(nil)
554506

cdc/cdc/capture/http_handler.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,11 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
166166

167167
taskStatus := make([]model.CaptureTaskStatus, 0, len(processorInfos))
168168
for captureID, status := range processorInfos {
169-
tables := make([]int64, 0)
170-
for tableID := range status.Tables {
171-
tables = append(tables, tableID)
169+
keyspans := make([]uint64, 0)
170+
for keyspanID := range status.KeySpans {
171+
keyspans = append(keyspans, keyspanID)
172172
}
173-
taskStatus = append(taskStatus, model.CaptureTaskStatus{CaptureID: captureID, Tables: tables, Operation: status.Operation})
173+
taskStatus = append(taskStatus, model.CaptureTaskStatus{CaptureID: captureID, KeySpans: keyspans, Operation: status.Operation})
174174
}
175175

176176
changefeedDetail := &model.ChangefeedDetail{
@@ -424,17 +424,17 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
424424
c.Status(http.StatusAccepted)
425425
}
426426

427-
// RebalanceTable rebalances tables
428-
// @Summary rebalance tables
429-
// @Description rebalance all tables of a changefeed
427+
// RebalanceKeySpan rebalances keyspans
428+
// @Summary rebalance keyspans
429+
// @Description rebalance all keyspans of a changefeed
430430
// @Tags changefeed
431431
// @Accept json
432432
// @Produce json
433433
// @Param changefeed_id path string true "changefeed_id"
434434
// @Success 202
435435
// @Failure 500,400 {object} model.HTTPError
436-
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
437-
func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
436+
// @Router /api/v1/changefeeds/{changefeed_id}/keyspans/rebalance_keyspan [post]
437+
func (h *HTTPHandler) RebalanceKeySpan(c *gin.Context) {
438438
if !h.capture.IsOwner() {
439439
h.forwardToOwner(c)
440440
return
@@ -462,19 +462,19 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
462462
c.Status(http.StatusAccepted)
463463
}
464464

465-
// MoveTable moves a table to target capture
466-
// @Summary move table
467-
// @Description move one table to the target capture
465+
// MoveKeySpan moves a keyspan to target capture
466+
// @Summary move keyspan
467+
// @Description move one keyspan to the target capture
468468
// @Tags changefeed
469469
// @Accept json
470470
// @Produce json
471471
// @Param changefeed_id path string true "changefeed_id"
472-
// @Param table_id body integer true "table_id"
472+
// @Param keyspan_id body integer true "keyspan_id"
473473
// @Param capture_id body string true "capture_id"
474474
// @Success 202
475475
// @Failure 500,400 {object} model.HTTPError
476-
// @Router /api/v1/changefeeds/{changefeed_id}/tables/move_table [post]
477-
func (h *HTTPHandler) MoveTable(c *gin.Context) {
476+
// @Router /api/v1/changefeeds/{changefeed_id}/keyspans/move_keyspan [post]
477+
func (h *HTTPHandler) MoveKeySpan(c *gin.Context) {
478478
if !h.capture.IsOwner() {
479479
h.forwardToOwner(c)
480480
return
@@ -495,7 +495,7 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
495495

496496
data := struct {
497497
CaptureID string `json:"capture_id"`
498-
TableID int64 `json:"table_id"`
498+
KeySpanID uint64 `json:"keyspan_id"`
499499
}{}
500500
err = c.BindJSON(&data)
501501
if err != nil {
@@ -509,7 +509,7 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
509509
}
510510

511511
_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
512-
owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID)
512+
owner.ManualSchedule(changefeedID, data.CaptureID, data.KeySpanID)
513513
return nil
514514
})
515515

@@ -586,7 +586,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
586586
return
587587
}
588588
position, exist := positions[captureID]
589-
// Note: for the case that no tables are attached to a newly created changefeed,
589+
// Note: for the case that no keyspans are attached to a newly created changefeed,
590590
// we just do not report an error.
591591
var processorDetail model.ProcessorDetail
592592
if exist {
@@ -596,11 +596,11 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
596596
Count: position.Count,
597597
Error: position.Error,
598598
}
599-
tables := make([]int64, 0)
600-
for tableID := range status.Tables {
601-
tables = append(tables, tableID)
599+
keyspans := make([]uint64, 0)
600+
for keyspanID := range status.KeySpans {
601+
keyspans = append(keyspans, keyspanID)
602602
}
603-
processorDetail.Tables = tables
603+
processorDetail.KeySpans = keyspans
604604
}
605605
c.IndentedJSON(http.StatusOK, &processorDetail)
606606
}

cdc/cdc/capture/http_validator.go

Lines changed: 15 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,13 @@ import (
1919

2020
"github.com/pingcap/errors"
2121
"github.com/pingcap/log"
22-
tidbkv "github.com/pingcap/tidb/kv"
2322
"github.com/r3labs/diff"
2423
"github.com/tikv/client-go/v2/oracle"
25-
"github.com/tikv/migration/cdc/cdc/entry"
26-
"github.com/tikv/migration/cdc/cdc/kv"
2724
"github.com/tikv/migration/cdc/cdc/model"
2825
"github.com/tikv/migration/cdc/cdc/sink"
2926
"github.com/tikv/migration/cdc/pkg/config"
3027
cerror "github.com/tikv/migration/cdc/pkg/errors"
31-
"github.com/tikv/migration/cdc/pkg/filter"
28+
3229
"github.com/tikv/migration/cdc/pkg/txnutil/gc"
3330
"github.com/tikv/migration/cdc/pkg/util"
3431
"github.com/tikv/migration/cdc/pkg/version"
@@ -80,19 +77,23 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
8077

8178
// init replicaConfig
8279
replicaConfig := config.GetDefaultReplicaConfig()
83-
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
84-
if changefeedConfig.MounterWorkerNum != 0 {
85-
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
86-
}
80+
/*
81+
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
82+
if changefeedConfig.MounterWorkerNum != 0 {
83+
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
84+
}
85+
*/
8786
if changefeedConfig.SinkConfig != nil {
8887
replicaConfig.Sink = changefeedConfig.SinkConfig
8988
}
90-
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
91-
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
92-
}
93-
if len(changefeedConfig.FilterRules) != 0 {
94-
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
95-
}
89+
/*
90+
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
91+
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
92+
}
93+
if len(changefeedConfig.FilterRules) != 0 {
94+
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
95+
}
96+
*/
9697

9798
captureInfos, err := capture.owner.StatusProvider().GetCaptures(ctx)
9899
if err != nil {
@@ -127,16 +128,6 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
127128
CreatorVersion: version.ReleaseVersion,
128129
}
129130

130-
if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
131-
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
132-
if err != nil {
133-
return nil, err
134-
}
135-
if len(ineligibleTables) != 0 {
136-
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
137-
}
138-
}
139-
140131
tz, err := util.GetTimezone(changefeedConfig.TimeZone)
141132
if err != nil {
142133
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
@@ -164,22 +155,6 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
164155
}
165156

166157
// verify rules
167-
if len(changefeedConfig.FilterRules) != 0 {
168-
newInfo.Config.Filter.Rules = changefeedConfig.FilterRules
169-
_, err = filter.VerifyRules(newInfo.Config)
170-
if err != nil {
171-
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
172-
}
173-
}
174-
175-
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
176-
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
177-
}
178-
179-
if changefeedConfig.MounterWorkerNum != 0 {
180-
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
181-
}
182-
183158
if changefeedConfig.SinkConfig != nil {
184159
newInfo.Config.Sink = changefeedConfig.SinkConfig
185160
}
@@ -198,30 +173,3 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
198173

199174
return newInfo, nil
200175
}
201-
202-
func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
203-
filter, err := filter.NewFilter(replicaConfig)
204-
if err != nil {
205-
return nil, nil, errors.Trace(err)
206-
}
207-
meta, err := kv.GetSnapshotMeta(storage, startTs)
208-
if err != nil {
209-
return nil, nil, errors.Trace(err)
210-
}
211-
snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
212-
if err != nil {
213-
return nil, nil, errors.Trace(err)
214-
}
215-
216-
for _, tableInfo := range snap.Tables() {
217-
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
218-
continue
219-
}
220-
if !tableInfo.IsEligible(false /* forceReplicate */) {
221-
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
222-
} else {
223-
eligibleTables = append(eligibleTables, tableInfo.TableName)
224-
}
225-
}
226-
return
227-
}

cdc/cdc/capture/http_validator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
4444
require.Nil(t, newInfo)
4545

4646
// test verify success
47-
changefeedConfig = model.ChangefeedConfig{MounterWorkerNum: 32}
47+
changefeedConfig = model.ChangefeedConfig{SinkConfig: &config.SinkConfig{Protocol: "test"}}
4848
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
4949
require.Nil(t, err)
5050
require.NotNil(t, newInfo)

0 commit comments

Comments
 (0)