Skip to content

Commit cb5b15f

Browse files
authored
Merge pull request #75 from TCeason/fix_max_thread
fix(worker): disable multi-threading for time-based split key
2 parents e684b9d + 2b74e03 commit cb5b15f

File tree

2 files changed

+24
-31
lines changed

2 files changed

+24
-31
lines changed

config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type Config struct {
6363
DisableVariantCheck bool `json:"disableVariantCheck" default:"true"`
6464
UserStage string `json:"userStage" default:"~"`
6565
DeleteAfterSync bool `json:"deleteAfterSync" default:"false"`
66-
MaxThread int `json:"maxThread" default:"1"`
66+
MaxThread int `json:"maxThread" default:"1"` // only supported with SourceSplitKey (auto increment)
6767
// Oracle
6868
OracleSID string `json:"oracleSID"`
6969
}
@@ -106,6 +106,9 @@ func preCheckConfig(cfg *Config) {
106106
}
107107
}
108108
if cfg.SourceSplitTimeKey != "" {
109+
if cfg.MaxThread > 1 {
110+
panic("SourceSplitTimeKey does not support MaxThread > 1; use SourceSplitKey for parallelism")
111+
}
109112
// time warehouse condition must be x < time and y > time
110113
err := validateSourceSplitTimeKey(cfg.SourceWhereCondition)
111114
if err != nil {

worker/worker.go

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ func (w *Worker) stepBatch() error {
136136
}
137137

138138
func (w *Worker) StepBatchByTimeSplitKey() error {
139-
wg := &sync.WaitGroup{}
139+
// Time-based splitting uses LIMIT/OFFSET over a non-unique, mutable key,
140+
// so running multiple goroutines risks duplicates/omissions.
141+
if w.Cfg.MaxThread > 1 {
142+
return fmt.Errorf("time split does not support MaxThread > 1; use auto increment split key")
143+
}
140144
minSplitKey, maxSplitKey, err := w.Src.GetMinMaxTimeSplitKey()
141145
if err != nil {
142146
return err
@@ -150,36 +154,22 @@ func (w *Worker) StepBatchByTimeSplitKey() error {
150154
}
151155
fmt.Println("allConditions: ", len(allConditions))
152156
fmt.Println("all split conditions", allConditions)
153-
slimedRange := source.SplitTimeConditionsByMaxThread(allConditions, w.Cfg.MaxThread)
154-
fmt.Println(len(slimedRange))
155-
fmt.Println("slimedRange", slimedRange)
156-
wg.Add(w.Cfg.MaxThread)
157-
for i := 0; i < 1; i++ {
158-
go func(idx int) {
159-
defer wg.Done()
160-
conditions := slimedRange[idx]
161-
logrus.Infof("conditions in one routine: %d", len(conditions))
162-
if err != nil {
163-
logrus.Errorf("stepBatchWithCondition failed: %v", err)
164-
}
165-
for _, condition := range conditions {
166-
logrus.Infof("condition: %s", condition)
167-
switch w.Cfg.DatabaseType {
168-
case "mysql":
169-
err = w.stepBatchWithTimeCondition(condition, w.Cfg.BatchSize)
170-
case "mssql":
171-
err = w.stepBatchWithTimeConditionMssql(condition, w.Cfg.BatchSize)
172-
default:
173-
err = w.stepBatchWithTimeCondition(condition, w.Cfg.BatchSize)
174-
}
175-
if err != nil {
176-
logrus.Errorf("stepBatchWithCondition failed: %v", err)
177-
}
178-
}
179-
}(i)
180-
}
181-
wg.Wait()
182157

158+
for _, condition := range allConditions {
159+
logrus.Infof("condition: %s", condition)
160+
switch w.Cfg.DatabaseType {
161+
case "mysql":
162+
err = w.stepBatchWithTimeCondition(condition, w.Cfg.BatchSize)
163+
case "mssql":
164+
err = w.stepBatchWithTimeConditionMssql(condition, w.Cfg.BatchSize)
165+
default:
166+
err = w.stepBatchWithTimeCondition(condition, w.Cfg.BatchSize)
167+
}
168+
if err != nil {
169+
logrus.Errorf("stepBatchWithCondition failed: %v", err)
170+
return err
171+
}
172+
}
183173
return nil
184174
}
185175

0 commit comments

Comments
 (0)