Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
233a73a
feat(*): Support pd-8.5 branch
tenfyzhong Dec 30, 2025
b7b98fd
refactor(gc): rename functions for consistency
tenfyzhong Dec 30, 2025
b1c14e9
refactor(tidbtype): move TiDB metadata definitions to dedicated package
tenfyzhong Dec 30, 2025
3c88e8f
refactor(tidbtype): replace tidb types with internal wrapper
tenfyzhong Dec 30, 2025
b58397e
test(canal): Fix test expectation for row key
tenfyzhong Dec 30, 2025
f2c5e5d
test: Fix package names and add import in nextgen test files
tenfyzhong Dec 30, 2025
72afe6d
chore(test): Add -modfile flag to next-gen unit test targets
tenfyzhong Dec 30, 2025
b04a350
test(pdutil): add build-tagged mock implementations for PD client
tenfyzhong Dec 30, 2025
d8dc7a5
chore: Fix import ordering in test files
tenfyzhong Dec 30, 2025
f65fa25
fix(tidy): Update check-tidy script for multi-module support
tenfyzhong Dec 31, 2025
cebebd2
fix(tools): improve go mod tidy handling for classic/nextgen files
tenfyzhong Jan 4, 2026
adaf6aa
fix(tidy): update tidy check to include both go.mod files
tenfyzhong Jan 4, 2026
77eb01e
chore(deps): Move pingcap/sysutil to direct dependencies
tenfyzhong Jan 4, 2026
9339a01
refactor(tidy): Improve check-tidy script with helper functions
tenfyzhong Jan 4, 2026
764a6f9
refactor(test): replace ast.CIStr with tidbtype.CIStr in test files
tenfyzhong Jan 4, 2026
a08bde7
chore(makefile): Consolidate test command arguments
tenfyzhong Jan 4, 2026
1c73dbc
chore(makefile): Fix shell quoting in FAIL_ON_STDOUT variable
tenfyzhong Jan 4, 2026
3374aa5
fix(make): simplify nextgen build flag handling
tenfyzhong Jan 4, 2026
d472182
refactor(build): consolidate test configuration and targets
tenfyzhong Jan 4, 2026
e25edb2
chore: Increase unit test parallelism
tenfyzhong Jan 4, 2026
c34941c
fix(tests): Replace deprecated tidb parser import with ticdc type uti…
tenfyzhong Jan 4, 2026
905f940
chore: Fix test flag syntax in Makefile
tenfyzhong Jan 5, 2026
13adcac
fix(tests): update PD client usage and add nextgen build tag support
tenfyzhong Jan 5, 2026
447c13f
tests: refactor integration tests to use GO_RUN_TAGS env variable
tenfyzhong Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 18 additions & 34 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
generate-next-gen-grafana


FAIL_ON_STDOUT := awk '{ print } END { if (NR > 0) { exit 1 } }'
FAIL_ON_STDOUT := awk "{ print } END { if (NR > 0) { exit 1 } }"

PROJECT=ticdc
.DEFAULT_GOAL := cdc
Expand Down Expand Up @@ -74,14 +74,20 @@ ifeq ("${NEXT_GEN}", "1")
else
BUILD_FLAG := $(BUILD_FLAG),nextgen
endif
BUILD_FLAG += -modfile=nextgen.go.mod
endif

TEST_FLAG=intest
# gotestsum -p parameter for unit tests
P=8

TEST_FLAG:=-tags intest
ifeq ("${NEXT_GEN}", "1")
TEST_FLAG := $(TEST_FLAG),nextgen
TEST_FLAG := $(TEST_FLAG),nextgen -modfile=nextgen.go.mod
else
endif
TEST_FLAG += -race -p $(P)

GOTEST := CGO_ENABLED=1 $(GO) test -p 3 --race --tags=$(TEST_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test $(TEST_FLAG)

RELEASE_VERSION =
ifeq ($(RELEASE_VERSION),)
Expand Down Expand Up @@ -127,9 +133,6 @@ FAILPOINT := tools/bin/failpoint-ctl
FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)
FAILPOINT_DISABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev/null)

# gotestsum -p parameter for unit tests
P=3

include tools/Makefile

go-generate: tools/bin/msgp tools/bin/stringer tools/bin/mockery
Expand Down Expand Up @@ -255,51 +258,32 @@ unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@echo "Running unit tests..."
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest \
export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 1200s $(TEST_FLAG) \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" $(PACKAGES) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)

unit_test_in_verify_ci_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@echo "Running unit tests..."
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest,nextgen \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" $(PACKAGES) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)
unit_test_in_verify_ci_next_gen:
NEXT_GEN=1 make unit_test_in_verify_ci

unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@echo "Running unit tests..."
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest \
export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 1200s $(TEST_FLAG) \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(PKG) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)

unit_test_pkg_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@echo "Running unit tests..."
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest,nextgen \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(PKG) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)
unit_test_pkg_next_gen:
NEXT_GEN=1 make unit_test_pkg

tidy:
@echo "go mod tidy"
Expand Down
3 changes: 2 additions & 1 deletion api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/keyspace"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
cfg.Keyspace = keyspaceName

// verify changefeed keyspace
if err := common.ValidateKeyspace(changefeedID.Keyspace()); err != nil {
if err := tidbtype.ValidateKeyspace(changefeedID.Keyspace()); err != nil {
_ = c.Error(errors.ErrAPIInvalidParam.GenWithStack(
"invalid keyspace: %s", cfg.ID))
return
Expand Down
2 changes: 1 addition & 1 deletion api/v2/unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) {

keyspaceMeta := middleware.GetKeyspaceFromContext(c)

err := gc.UnifyDeleteGcSafepoint(
err := gc.RemoveServiceGCSafepoint(
c,
pdClient,
keyspaceMeta.Id,
Expand Down
4 changes: 2 additions & 2 deletions cmd/cdc/cli/cli_unsafe_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (o *unsafeResetOptions) run(cmd *cobra.Command) error {
}

if kerneltype.IsClassic() {
err := gc.UnifyDeleteGcSafepoint(ctx, o.pdClient, 0, o.etcdClient.GetGCServiceID())
err := gc.RemoveServiceGCSafepoint(ctx, o.pdClient, 0, o.etcdClient.GetGCServiceID())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func removeKeyspaceGCBarrier(ctx context.Context, pdCli pd.Client, serviceID str
log.Warn("load keyspace error", zap.String("keyspace", keyspace), zap.Error(err))
continue
}
err = gc.UnifyDeleteGcSafepoint(ctx, pdCli, keyspaceMeta.Id, serviceID)
err = gc.RemoveServiceGCSafepoint(ctx, pdCli, keyspaceMeta.Id, serviceID)
if err != nil {
log.Warn("DeleteGcSafepoint error", zap.Uint32("keyspaceID", keyspaceMeta.Id), zap.String("keyspace", keyspace), zap.String("serviceID", serviceID), zap.Error(err))
continue
Expand Down
8 changes: 4 additions & 4 deletions cmd/cdc/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/pdtype"
"github.com/pingcap/ticdc/pkg/version"
pd "github.com/tikv/pd/client"
pdopt "github.com/tikv/pd/client/opt"
etcdlogutil "go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -135,12 +135,12 @@ func (f *factoryImpl) PdClient() (pd.Client, error) {
}
}

pdClient, err := pd.NewClientWithContext(
pdClient, err := pdtype.NewClientWithContext(
f.ctx, "cdc-factory", pdEndpoints, credential.PDSecurityOption(),
pdopt.WithMaxErrorRetry(maxGetPDClientRetryTimes),
pdtype.WithMaxErrorRetry(maxGetPDClientRetryTimes),
// TODO(hi-rustin): add gRPC metrics to Options.
// See also: https://github.com/pingcap/tiflow/pull/2341#discussion_r673032407.
pdopt.WithGRPCDialOptions(
pdtype.WithGRPCDialOptions(
grpcTLSOption,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand Down
4 changes: 2 additions & 2 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (c *coordinator) updateGlobalGcSafepoint(ctx context.Context) error {
// (checkpointTs - 1) from TiKV, so (checkpointTs - 1) should be an upper
// bound for the GC safepoint.
gcSafepointUpperBound := minCheckpointTs - 1
err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, false)
err := c.gcManager.TryUpdateGCSafePoint(ctx, common.DefaultKeyspace.ID, common.DefaultKeyspace.Name, gcSafepointUpperBound, false)
return errors.Trace(err)
}

Expand All @@ -452,7 +452,7 @@ func (c *coordinator) updateAllKeyspaceGcBarriers(ctx context.Context) error {

func (c *coordinator) updateKeyspaceGcBarrier(ctx context.Context, meta common.KeyspaceMeta, barrierTS uint64) error {
barrierTsUpperBound := barrierTS - 1
err := c.gcManager.TryUpdateKeyspaceGCBarrier(ctx, meta.ID, meta.Name, barrierTsUpperBound, false)
err := c.gcManager.TryUpdateGCSafePoint(ctx, meta.ID, meta.Name, barrierTsUpperBound, false)
return errors.Trace(err)
}

Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/sink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/pingcap/ticdc/utils/chann"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -72,10 +72,10 @@ func TestDeframenter(t *testing.T) {

tidbTableInfo := &timodel.TableInfo{
ID: 100,
Name: ast.NewCIStr("table1"),
Name: tidbtype.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{ID: 1, Name: ast.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: ast.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)},
{ID: 1, Name: tidbtype.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: tidbtype.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)},
},
}
tableInfo := common.WrapTableInfo("test", tidbTableInfo)
Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/sink/cloudstorage/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/pingcap/ticdc/pkg/util"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -157,14 +157,14 @@ func TestWriteDDLEvent(t *testing.T) {

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Name: tidbtype.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
Name: tidbtype.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: ast.NewCIStr("col2"),
Name: tidbtype.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/sink/cloudstorage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/chann"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -76,10 +76,10 @@ func TestWriterRun(t *testing.T) {

tidbTableInfo := &timodel.TableInfo{
ID: 100,
Name: ast.NewCIStr("table1"),
Name: tidbtype.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{ID: 1, Name: ast.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: ast.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)},
{ID: 1, Name: tidbtype.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)},
{ID: 2, Name: tidbtype.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)},
},
}
tableInfo := commonType.WrapTableInfo("test", tidbTableInfo)
Expand Down
32 changes: 16 additions & 16 deletions downstreamadapter/sink/columnselector/column_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"testing"

"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -80,13 +80,13 @@ func TestColumnSelectorGetSelector(t *testing.T) {
selector := selectors.Get("test", "t1")
columns := []*model.ColumnInfo{
{
Name: ast.NewCIStr("a"),
Name: tidbtype.NewCIStr("a"),
},
{
Name: ast.NewCIStr("b"),
Name: tidbtype.NewCIStr("b"),
},
{
Name: ast.NewCIStr("c"),
Name: tidbtype.NewCIStr("c"),
},
}
for _, col := range columns {
Expand All @@ -102,13 +102,13 @@ func TestColumnSelectorGetSelector(t *testing.T) {
selector := selectors.Get("test1", "aaa")
columns := []*model.ColumnInfo{
{
Name: ast.NewCIStr("a"),
Name: tidbtype.NewCIStr("a"),
},
{
Name: ast.NewCIStr("b"),
Name: tidbtype.NewCIStr("b"),
},
{
Name: ast.NewCIStr("c"),
Name: tidbtype.NewCIStr("c"),
},
}
for _, col := range columns {
Expand All @@ -124,13 +124,13 @@ func TestColumnSelectorGetSelector(t *testing.T) {
selector := selectors.Get("test2", "t2")
columns := []*model.ColumnInfo{
{
Name: ast.NewCIStr("a"),
Name: tidbtype.NewCIStr("a"),
},
{
Name: ast.NewCIStr("col2"),
Name: tidbtype.NewCIStr("col2"),
},
{
Name: ast.NewCIStr("col1"),
Name: tidbtype.NewCIStr("col1"),
},
}
for _, col := range columns {
Expand All @@ -146,13 +146,13 @@ func TestColumnSelectorGetSelector(t *testing.T) {
selector := selectors.Get("test3", "t3")
columns := []*model.ColumnInfo{
{
Name: ast.NewCIStr("a"),
Name: tidbtype.NewCIStr("a"),
},
{
Name: ast.NewCIStr("col2"),
Name: tidbtype.NewCIStr("col2"),
},
{
Name: ast.NewCIStr("col1"),
Name: tidbtype.NewCIStr("col1"),
},
}
for _, col := range columns {
Expand All @@ -168,13 +168,13 @@ func TestColumnSelectorGetSelector(t *testing.T) {
selector := selectors.Get("test4", "t4")
columns := []*model.ColumnInfo{
{
Name: ast.NewCIStr("a"),
Name: tidbtype.NewCIStr("a"),
},
{
Name: ast.NewCIStr("col2"),
Name: tidbtype.NewCIStr("col2"),
},
{
Name: ast.NewCIStr("col1"),
Name: tidbtype.NewCIStr("col1"),
},
}
for _, col := range columns {
Expand Down
6 changes: 3 additions & 3 deletions downstreamadapter/sink/mysql/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/sink/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/ticdc/pkg/tidbtype"
"github.com/stretchr/testify/require"
)

Expand All @@ -36,7 +36,7 @@ func getMysqlSink() (context.Context, *Sink, sqlmock.Sqlmock) {
cfg := mysql.New()
cfg.WorkerCount = 1
cfg.DMLMaxRetry = 1
cfg.MaxAllowedPacket = int64(vardef.DefMaxAllowedPacket)
cfg.MaxAllowedPacket = int64(tidbtype.DefMaxAllowedPacket)
cfg.CachePrepStmts = false

sink := NewMySQLSink(ctx, changefeedID, cfg, db, false)
Expand All @@ -51,7 +51,7 @@ func getMysqlSinkWithDDLTs() (context.Context, *Sink, sqlmock.Sqlmock) {
cfg := mysql.New()
cfg.WorkerCount = 1
cfg.DMLMaxRetry = 1
cfg.MaxAllowedPacket = int64(vardef.DefMaxAllowedPacket)
cfg.MaxAllowedPacket = int64(tidbtype.DefMaxAllowedPacket)
cfg.CachePrepStmts = false
cfg.EnableDDLTs = true // Enable DDL-ts feature for testing

Expand Down
Loading