Skip to content

Commit 6da0b8a

Browse files
author
Shlomi Noach
committed
Dynamic DML batch size; apadting buffer size to match
1 parent 94a325c commit 6da0b8a

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

go/base/context.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ const (
4646
)
4747

4848
const (
49-
HTTPStatusOK = 200
50-
maxBatchSize = 1000
49+
HTTPStatusOK = 200
50+
MaxEventsBatchSize = 1000
5151
)
5252

5353
var (
@@ -442,8 +442,8 @@ func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
442442
if batchSize < 1 {
443443
batchSize = 1
444444
}
445-
if batchSize > maxBatchSize {
446-
batchSize = maxBatchSize
445+
if batchSize > MaxEventsBatchSize {
446+
batchSize = MaxEventsBatchSize
447447
}
448448
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
449449
}

go/logic/migrator.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct
5252
return result
5353
}
5454

55-
const (
56-
applyEventsQueueBuffer = 100
57-
)
58-
5955
type PrintStatusRule int
6056

6157
const (
@@ -101,7 +97,7 @@ func NewMigrator() *Migrator {
10197
allEventsUpToLockProcessed: make(chan string),
10298

10399
copyRowsQueue: make(chan tableWriteFunc),
104-
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
100+
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
105101
handledChangelogStates: make(map[string]bool),
106102
}
107103
return migrator
@@ -767,9 +763,10 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
767763
))
768764
maxLoad := this.migrationContext.GetMaxLoad()
769765
criticalLoad := this.migrationContext.GetCriticalLoad()
770-
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %f",
766+
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f",
771767
atomic.LoadInt64(&this.migrationContext.ChunkSize),
772768
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
769+
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
773770
maxLoad.String(),
774771
criticalLoad.String(),
775772
this.migrationContext.GetNiceRatio(),

go/logic/server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,19 @@ help # This message
187187
return ForcePrintStatusAndHintRule, nil
188188
}
189189
}
190+
case "dml-batch-size":
191+
{
192+
if argIsQuestion {
193+
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
194+
return NoPrintStatusRule, nil
195+
}
196+
if dmlBatchSize, err := strconv.Atoi(arg); err != nil {
197+
return NoPrintStatusRule, err
198+
} else {
199+
this.migrationContext.SetDMLBatchSize(int64(dmlBatchSize))
200+
return ForcePrintStatusAndHintRule, nil
201+
}
202+
}
190203
case "max-lag-millis":
191204
{
192205
if argIsQuestion {

0 commit comments

Comments
 (0)