diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 86ea1da5fa..3db046e5ad 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -388,6 +388,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { if c.SyncerConfig.QueueSize == 0 { c.SyncerConfig.QueueSize = defaultQueueSize } + if c.SyncerConfig.EventCacheCount == 0 { + c.SyncerConfig.EventCacheCount = defaultEventCacheCount + } if c.SyncerConfig.CheckpointFlushInterval == 0 { c.SyncerConfig.CheckpointFlushInterval = defaultCheckpointFlushInterval } diff --git a/dm/config/task.go b/dm/config/task.go index 6eb3b4e68e..ca80056400 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -103,7 +103,8 @@ var ( defaultWorkerCount = 16 defaultBatch = 100 defaultQueueSize = 1024 // do not give too large default value to avoid OOM - defaultCheckpointFlushInterval = 30 // in seconds + defaultEventCacheCount = 10240 + defaultCheckpointFlushInterval = 30 // in seconds defaultSafeModeDuration = strconv.Itoa(2*defaultCheckpointFlushInterval) + "s" // TargetDBConfig. @@ -394,10 +395,11 @@ func (m *LoaderConfig) adjust() error { // SyncerConfig represents syncer process unit's specific config. type SyncerConfig struct { - MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly - WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` - Batch int `yaml:"batch" toml:"batch" json:"batch"` - QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` + MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly + WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` + Batch int `yaml:"batch" toml:"batch" json:"batch"` + QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` + EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"` // checkpoint flush interval in seconds. CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` // TODO: add this two new config items for openapi. @@ -424,6 +426,7 @@ func DefaultSyncerConfig() SyncerConfig { WorkerCount: defaultWorkerCount, Batch: defaultBatch, QueueSize: defaultQueueSize, + EventCacheCount: defaultEventCacheCount, CheckpointFlushInterval: defaultCheckpointFlushInterval, SafeModeDuration: defaultSafeModeDuration, } @@ -452,6 +455,7 @@ type ValidatorConfig struct { BatchQuerySize int `yaml:"batch-query-size" toml:"batch-query-size" json:"batch-query-size"` MaxPendingRowSize string `yaml:"max-pending-row-size" toml:"max-pending-row-size" json:"max-pending-row-size"` MaxPendingRowCount int `yaml:"max-pending-row-count" toml:"max-pending-row-count" json:"max-pending-row-count"` + EventCacheCount int `yaml:"event-cache-count" toml:"event-cache-count" json:"event-cache-count"` StartTime string `yaml:"-" toml:"start-time" json:"-"` } @@ -491,12 +495,16 @@ func (v *ValidatorConfig) Adjust() error { if v.MaxPendingRowCount == 0 { v.MaxPendingRowCount = DefaultValidatorMaxPendingRow } + if v.EventCacheCount == 0 { + v.EventCacheCount = defaultEventCacheCount + } return nil } func defaultValidatorConfig() ValidatorConfig { return ValidatorConfig{ - Mode: ValidationNone, + Mode: ValidationNone, + EventCacheCount: defaultEventCacheCount, } } @@ -887,6 +895,9 @@ func (c *TaskConfig) adjust() error { if inst.Syncer.QueueSize == 0 { inst.Syncer.QueueSize = defaultQueueSize } + if inst.Syncer.EventCacheCount == 0 { + inst.Syncer.EventCacheCount = defaultEventCacheCount + } if inst.Syncer.CheckpointFlushInterval == 0 { inst.Syncer.CheckpointFlushInterval = defaultCheckpointFlushInterval } @@ -1165,6 +1176,7 @@ type SyncerConfigForDowngrade struct { WorkerCount int `yaml:"worker-count"` Batch int `yaml:"batch"` QueueSize int `yaml:"queue-size"` + EventCacheCount int `yaml:"event-cache-count"` CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"` MaxRetry int `yaml:"max-retry"` EnableGTID bool `yaml:"enable-gtid"` @@ -1186,6 +1198,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st WorkerCount: syncerConfig.WorkerCount, Batch: syncerConfig.Batch, QueueSize: syncerConfig.QueueSize, + EventCacheCount: syncerConfig.EventCacheCount, CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval, MaxRetry: syncerConfig.MaxRetry, EnableGTID: syncerConfig.EnableGTID, diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index f471e139d3..633d733a65 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -314,7 +314,7 @@ func (v *DataValidator) initialize() error { return err } - v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList) + v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList, v.cfg.ValidatorCfg.EventCacheCount) if err != nil { return err } diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 3534d3774b..740b499c79 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -386,7 +386,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSyncerUnitGenBAList.Delegate(err) } - s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList) + s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList, s.cfg.SyncerConfig.EventCacheCount) if err != nil { return err } diff --git a/dm/syncer/util.go b/dm/syncer/util.go index 1ee1ac8dde..10db7efb66 100644 --- a/dm/syncer/util.go +++ b/dm/syncer/util.go @@ -139,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg conn.Scope return loc, tzStr, nil } -func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) { +func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter, eventCacheCount int) (replication.BinlogSyncerConfig, error) { var tlsConfig *tls.Config var err error if cfg.From.Security != nil { @@ -213,6 +213,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati TLSConfig: tlsConfig, RowsEventDecodeFunc: rowsEventDecodeFunc, Localhost: h, + EventCacheCount: eventCacheCount, } // when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode, // which may get duplicate binlog event after retry success. so just set retry count = 1, and task diff --git a/dm/syncer/util_test.go b/dm/syncer/util_test.go index 8c4391ffd5..3296b7a0d4 100644 --- a/dm/syncer/util_test.go +++ b/dm/syncer/util_test.go @@ -19,10 +19,12 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" _ "github.com/pingcap/tidb/pkg/types/parser_driver" "github.com/pingcap/tidb/pkg/util/filter" + "github.com/pingcap/tiflow/dm/config" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/syncer/dbconn" @@ -128,6 +130,19 @@ func TestRecordSourceTbls(t *testing.T) { require.Len(t, sourceTbls, 0) } +func TestSubtaskCfg2BinlogSyncerCfgEventCacheCount(t *testing.T) { + cfg := &config.SubTaskConfig{ + ServerID: 1234, + Flavor: mysql.MySQLFlavor, + WorkerName: "worker-01", + From: config.GetDBConfigForTest(), + } + + syncCfg, err := subtaskCfg2BinlogSyncerCfg(cfg, time.UTC, nil, 2048) + require.NoError(t, err) + require.Equal(t, 2048, syncCfg.EventCacheCount) +} + func TestGetDDLStatusFromTiDB(t *testing.T) { var ( cfg = genDefaultSubTaskConfig4Test()