Skip to content
Open
4 changes: 2 additions & 2 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,12 +1541,12 @@ func getVerifiedTables(

func GetKeyspaceValueWithDefault(c *gin.Context) string {
if kerneltype.IsClassic() {
return common.DefaultKeyspaceNamme
return common.DefaultKeyspaceName
}

keyspace := c.Query(api.APIOpVarKeyspace)
if keyspace == "" {
keyspace = common.DefaultKeyspaceNamme
keyspace = common.DefaultKeyspaceName
}
return keyspace
}
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func newWriter(ctx context.Context, o *option) *writer {
log.Info("event router created", zap.Any("protocol", o.protocol),
zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.sinkConfig.DispatchRules))

changefeedID := commonType.NewChangeFeedIDWithName("kafka-consumer", commonType.DefaultKeyspaceNamme)
changefeedID := commonType.NewChangeFeedIDWithName("kafka-consumer", commonType.DefaultKeyspaceName)
cfg := &config.ChangefeedConfig{
ChangefeedID: changefeedID,
SinkURI: o.downstreamURI,
Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newWriter(ctx context.Context, o *option) *writer {
log.Info("event router created", zap.Any("protocol", o.protocol),
zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules))

changefeedID := commonType.NewChangeFeedIDWithName("pulsar-consumer", commonType.DefaultKeyspaceNamme)
changefeedID := commonType.NewChangeFeedIDWithName("pulsar-consumer", commonType.DefaultKeyspaceName)
cfg := &config.ChangefeedConfig{
ChangefeedID: changefeedID,
SinkURI: o.downstreamURI,
Expand Down
2 changes: 1 addition & 1 deletion cmd/storage-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
SinkURI: downstreamURIStr,
SinkConfig: replicaConfig.Sink,
}
sink, err := sink.New(stdCtx, cfg, commonType.NewChangeFeedIDWithName(defaultChangefeedName, commonType.DefaultKeyspaceNamme))
sink, err := sink.New(stdCtx, cfg, commonType.NewChangeFeedIDWithName(defaultChangefeedName, commonType.DefaultKeyspaceName))
if err != nil {
log.Error("failed to create sink", zap.Error(err))
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions coordinator/changefeed/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestRetry(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), time.Minute*30, 1)
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), time.Minute*30, 1)
require.True(t, backoff.ShouldRun())

// stop the backoff
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestRetry(t *testing.T) {
}

func TestErrorReportedWhenRetrying(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), time.Minute*30, 1)
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), time.Minute*30, 1)
require.True(t, backoff.ShouldRun())

changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestErrorReportedWhenRetrying(t *testing.T) {
}

func TestFailedWhenRetry(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), time.Second*30, 1)
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), time.Second*30, 1)
require.True(t, backoff.ShouldRun())

mc := clock.NewMock()
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestFailedWhenRetry(t *testing.T) {
}

func TestNormal(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), time.Second*10, 1)
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), time.Second*10, 1)
require.True(t, backoff.ShouldRun())

changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
Expand Down
58 changes: 29 additions & 29 deletions coordinator/changefeed/changefeed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestAddAbsentChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
cf.backoff = NewBackoff(cf.ID, 0, 0)

db.AddAbsentChangefeed(cf)
Expand All @@ -39,7 +39,7 @@ func TestAddAbsentChangefeed(t *testing.T) {

func TestAddStoppedChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}

db.AddStoppedChangefeed(cf)

Expand All @@ -49,7 +49,7 @@ func TestAddStoppedChangefeed(t *testing.T) {

func TestAddReplicatingMaintainer(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
nodeID := node.ID("node-1")

db.AddReplicatingMaintainer(cf, nodeID)
Expand All @@ -61,7 +61,7 @@ func TestAddReplicatingMaintainer(t *testing.T) {

func TestStopByChangefeedID(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
db.AddReplicatingMaintainer(cf, node.ID("node-1"))
require.Contains(t, db.GetByNodeID("node-1"), cf)

Expand All @@ -77,12 +77,12 @@ func TestStopByChangefeedID(t *testing.T) {
_, ok := sizeMap["node-1"]
require.False(t, ok)

require.Equal(t, "", db.StopByChangefeedID(common.NewChangeFeedIDWithName("a", common.DefaultKeyspaceNamme), false).String())
require.Equal(t, "", db.StopByChangefeedID(common.NewChangeFeedIDWithName("a", common.DefaultKeyspaceName), false).String())
}

func TestMoveToSchedulingQueue(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
db.AddStoppedChangefeed(cf)
cf.backoff = NewBackoff(cf.ID, 0, 0)
cf.status = atomic.NewPointer(&heartbeatpb.MaintainerStatus{
Expand All @@ -97,15 +97,15 @@ func TestMoveToSchedulingQueue(t *testing.T) {

func TestRemoveChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
db.AddAbsentChangefeed(cf)

db.StopByChangefeedID(cf.ID, false)
require.NotContains(t, db.GetAbsent(), cf)
require.Contains(t, db.changefeeds, cf.ID)
require.Contains(t, db.stopped, cf.ID)

cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
db.AddReplicatingMaintainer(cf2, "node1")
require.Contains(t, db.GetByNodeID("node1"), cf2)
require.Equal(t, node.ID("node1"), db.StopByChangefeedID(cf2.ID, true))
Expand All @@ -121,7 +121,7 @@ func TestRemoveChangefeed(t *testing.T) {

func TestGetByID(t *testing.T) {
db := NewChangefeedDB(1216)
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)}
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)}
db.AddStoppedChangefeed(cf)

result := db.GetByID(cf.ID)
Expand All @@ -130,8 +130,8 @@ func TestGetByID(t *testing.T) {

func TestChangefeedDBGetAllChangefeeds(t *testing.T) {
db := NewChangefeedDB(1216)
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceName)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
db.AddAbsentChangefeed(cf1)
db.AddAbsentChangefeed(cf2)

Expand All @@ -143,14 +143,14 @@ func TestChangefeedDBGetAllChangefeeds(t *testing.T) {

func TestGetWaitingSchedulingChangefeeds(t *testing.T) {
db := NewChangefeedDB(1216)
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceName)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
cf1.backoff = NewBackoff(cf1.ID, 0, 0)
cf1.backoff.failed.Store(true)
cf2.backoff = NewBackoff(cf2.ID, 0, 0)
db.AddAbsentChangefeed(cf1)
db.AddReplicatingMaintainer(cf2, "node1")
cf3 := &Changefeed{ID: common.NewChangeFeedIDWithName("test3", common.DefaultKeyspaceNamme)}
cf3 := &Changefeed{ID: common.NewChangeFeedIDWithName("test3", common.DefaultKeyspaceName)}
cf3.backoff = NewBackoff(cf3.ID, 0, 0)
db.AddAbsentChangefeed(cf3)

Expand All @@ -167,8 +167,8 @@ func TestGetWaitingSchedulingChangefeeds(t *testing.T) {

func TestGetAllStoppedChangefeeds(t *testing.T) {
db := NewChangefeedDB(1216)
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceName)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
db.AddStoppedChangefeed(cf1)
db.AddStoppedChangefeed(cf2)

Expand All @@ -177,8 +177,8 @@ func TestGetAllStoppedChangefeeds(t *testing.T) {

func TestGetAllReplicatingMaintainers(t *testing.T) {
db := NewChangefeedDB(1216)
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceName)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
nodeID1 := node.ID("node-1")
nodeID2 := node.ID("node-2")
db.AddReplicatingMaintainer(cf1, nodeID1)
Expand All @@ -191,11 +191,11 @@ func TestGetAllReplicatingMaintainers(t *testing.T) {

func TestGetSize(t *testing.T) {
db := NewChangefeedDB(1216)
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceNamme)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)}
cf1 := &Changefeed{ID: common.NewChangeFeedIDWithName("test1", common.DefaultKeyspaceName)}
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)}
db.AddReplicatingMaintainer(cf1, "node-1")
db.AddAbsentChangefeed(cf2)
db.AddStoppedChangefeed(&Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceNamme)})
db.AddStoppedChangefeed(&Changefeed{ID: common.NewChangeFeedIDWithName("test2", common.DefaultKeyspaceName)})
require.Equal(t, 1, db.GetReplicatingSize())
require.Equal(t, 1, db.GetStoppedSize())
require.Equal(t, 1, db.GetAbsentSize())
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestGetSize(t *testing.T) {

func TestReplaceStoppedChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf := &Changefeed{
ID: cfID,
info: atomic.NewPointer(&config.ChangeFeedInfo{
Expand All @@ -247,7 +247,7 @@ func TestReplaceStoppedChangefeed(t *testing.T) {
cf3 := db.GetByID(cf.ID)
require.Equal(t, true, cf3.NeedCheckpointTsMessage())

cf4ID := common.NewChangeFeedIDWithName("test4", common.DefaultKeyspaceNamme)
cf4ID := common.NewChangeFeedIDWithName("test4", common.DefaultKeyspaceName)
cf4 := &config.ChangeFeedInfo{
ChangefeedID: cf4ID,
SinkURI: "kafka://127.0.0.1:9092",
Expand All @@ -259,7 +259,7 @@ func TestReplaceStoppedChangefeed(t *testing.T) {

func TestScheduleChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf := NewChangefeed(cfID, &config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestCalculateGCSafepoint(t *testing.T) {
db := NewChangefeedDB(1216)
require.True(t, math.MaxUint64 == db.CalculateGlobalGCSafepoint())

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf1 := NewChangefeed(cfID,
&config.ChangeFeedInfo{
ChangefeedID: cfID,
Expand All @@ -299,7 +299,7 @@ func TestCalculateGCSafepoint(t *testing.T) {
db.AddStoppedChangefeed(cf1)
require.Equal(t, uint64(11), db.CalculateGlobalGCSafepoint())

cf2ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cf2ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf2 := NewChangefeed(cf2ID,
&config.ChangeFeedInfo{
ChangefeedID: cf2ID,
Expand All @@ -309,7 +309,7 @@ func TestCalculateGCSafepoint(t *testing.T) {
db.AddStoppedChangefeed(cf2)
require.Equal(t, uint64(11), db.CalculateGlobalGCSafepoint())

cf3ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cf3ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf3 := NewChangefeed(cf3ID,
&config.ChangeFeedInfo{
ChangefeedID: cf3ID,
Expand All @@ -319,7 +319,7 @@ func TestCalculateGCSafepoint(t *testing.T) {
db.AddStoppedChangefeed(cf3)
require.Equal(t, uint64(10), db.CalculateGlobalGCSafepoint())

cf4ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cf4ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf4 := NewChangefeed(cf4ID,
&config.ChangeFeedInfo{
ChangefeedID: cf4ID,
Expand All @@ -332,7 +332,7 @@ func TestCalculateGCSafepoint(t *testing.T) {
db.AddStoppedChangefeed(cf4)
require.Equal(t, uint64(10), db.CalculateGlobalGCSafepoint())

cf5ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cf5ID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf5 := NewChangefeed(cf5ID,
&config.ChangeFeedInfo{
ChangefeedID: cf5ID,
Expand Down
20 changes: 10 additions & 10 deletions coordinator/changefeed/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestNewChangefeed(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -42,7 +42,7 @@ func TestNewChangefeed(t *testing.T) {
}

func TestChangefeed_GetSetInfo(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -60,7 +60,7 @@ func TestChangefeed_GetSetInfo(t *testing.T) {
}

func TestChangefeed_GetSetNodeID(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -74,7 +74,7 @@ func TestChangefeed_GetSetNodeID(t *testing.T) {
}

func TestChangefeed_UpdateStatus(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -91,7 +91,7 @@ func TestChangefeed_UpdateStatus(t *testing.T) {
}

func TestChangefeed_IsMQSink(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -103,7 +103,7 @@ func TestChangefeed_IsMQSink(t *testing.T) {
}

func TestChangefeed_GetSetLastSavedCheckPointTs(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -117,7 +117,7 @@ func TestChangefeed_GetSetLastSavedCheckPointTs(t *testing.T) {
}

func TestChangefeed_NewAddMaintainerMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -132,7 +132,7 @@ func TestChangefeed_NewAddMaintainerMessage(t *testing.T) {
}

func TestChangefeed_NewRemoveMaintainerMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -147,7 +147,7 @@ func TestChangefeed_NewRemoveMaintainerMessage(t *testing.T) {
}

func TestChangefeed_NewCheckpointTsMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
info := &config.ChangeFeedInfo{
SinkURI: "kafka://127.0.0.1:9092",
State: config.StateNormal,
Expand All @@ -162,7 +162,7 @@ func TestChangefeed_NewCheckpointTsMessage(t *testing.T) {
}

func TestRemoveMaintainerMessage(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
server := node.ID("server-1")
msg := RemoveMaintainerMessage(common.DefaultKeyspaceID, cfID, server, true, true)
require.Equal(t, server, msg.To)
Expand Down
Loading