Skip to content

Commit 8d8ef34

Browse files
author
Shlomi Noach
authored
Merge pull request #352 from github/batch-apply-dml-events
Batch apply dml events
2 parents e17f41b + 1157027 commit 8d8ef34

File tree

6 files changed

+150
-13
lines changed

6 files changed

+150
-13
lines changed

RELEASE_VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.32
1+
1.0.34

doc/command-line-flags.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,17 @@ At this time (10-2016) `gh-ost` does not support foreign keys on migrated tables
6565

6666
See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks)
6767

68+
69+
### dml-batch-size
70+
71+
`gh-ost` reads event from the binary log and applies them onto the _ghost_ table. It does so in batched writes: grouping multiple events to apply in a single transaction. This gives better write throughput as we don't need to sync the transaction log to disk for each event.
72+
73+
The `--dml-batch-size` flag controls the size of the batched write. Allowed values are `1 - 100`, where `1` means no batching (every event from the binary log is applied onto the _ghost_ table on its own transaction). Default value is `10`.
74+
75+
Why is this behavior configurable? Different workloads have different characteristics. Some workloads have very large writes, such that aggregating even `50` writes into a transaction makes for a significant transaction size. On other workloads write rate is high such that one just can't allow for a hundred more syncs to disk per second. The default value of `10` is a modest compromise that should probably work very well for most workloads. Your mileage may vary.
76+
77+
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light neough for `gh-ost` to apply a fraction of the batch size.
78+
6879
### exact-rowcount
6980

7081
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?

go/base/context.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ type MigrationContext struct {
148148
controlReplicasLagResult mysql.ReplicationLagResult
149149
TotalRowsCopied int64
150150
TotalDMLEventsApplied int64
151+
DMLBatchSize int64
151152
isThrottled bool
152153
throttleReason string
153154
throttleReasonHint ThrottleReasonHint
@@ -207,6 +208,7 @@ func newMigrationContext() *MigrationContext {
207208
ApplierConnectionConfig: mysql.NewConnectionConfig(),
208209
MaxLagMillisecondsThrottleThreshold: 1500,
209210
CutOverLockTimeoutSeconds: 3,
211+
DMLBatchSize: 10,
210212
maxLoad: NewLoadMap(),
211213
criticalLoad: NewLoadMap(),
212214
throttleMutex: &sync.Mutex{},
@@ -417,6 +419,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
417419
atomic.StoreInt64(&this.ChunkSize, chunkSize)
418420
}
419421

422+
func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
423+
if batchSize < 1 {
424+
batchSize = 1
425+
}
426+
if batchSize > 100 {
427+
batchSize = 100
428+
}
429+
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
430+
}
431+
420432
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
421433
this.throttleMutex.Lock()
422434
defer this.throttleMutex.Unlock()

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func main() {
8383
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
8484
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
8585
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
86+
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
8687
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
8788
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
8889
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
@@ -223,6 +224,7 @@ func main() {
223224
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
224225
migrationContext.SetNiceRatio(*niceRatio)
225226
migrationContext.SetChunkSize(*chunkSize)
227+
migrationContext.SetDMLBatchSize(*dmlBatchSize)
226228
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
227229
migrationContext.SetThrottleQuery(*throttleQuery)
228230
migrationContext.SetDefaultNumRetries(*defaultRetries)

go/logic/applier.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,3 +950,55 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
950950
}
951951
return nil
952952
}
953+
954+
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
955+
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
956+
957+
var totalDelta int64
958+
959+
err := func() error {
960+
tx, err := this.db.Begin()
961+
if err != nil {
962+
return err
963+
}
964+
965+
rollback := func(err error) error {
966+
tx.Rollback()
967+
return err
968+
}
969+
970+
sessionQuery := `SET
971+
SESSION time_zone = '+00:00',
972+
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
973+
`
974+
if _, err := tx.Exec(sessionQuery); err != nil {
975+
return rollback(err)
976+
}
977+
for _, dmlEvent := range dmlEvents {
978+
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
979+
if err != nil {
980+
return rollback(err)
981+
}
982+
if _, err := tx.Exec(query, args...); err != nil {
983+
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
984+
return rollback(err)
985+
}
986+
totalDelta += rowDelta
987+
}
988+
if err := tx.Commit(); err != nil {
989+
return err
990+
}
991+
return nil
992+
}()
993+
994+
if err != nil {
995+
return log.Errore(err)
996+
}
997+
// no error
998+
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
999+
if this.migrationContext.CountTableRows {
1000+
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
1001+
}
1002+
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
1003+
return nil
1004+
}

go/logic/migrator.go

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ func ReadChangelogState(s string) ChangelogState {
3737

3838
type tableWriteFunc func() error
3939

40+
type applyEventStruct struct {
41+
writeFunc *tableWriteFunc
42+
dmlEvent *binlog.BinlogDMLEvent
43+
}
44+
45+
func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
46+
result := &applyEventStruct{writeFunc: writeFunc}
47+
return result
48+
}
49+
50+
func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct {
51+
result := &applyEventStruct{dmlEvent: dmlEvent}
52+
return result
53+
}
54+
4055
const (
4156
applyEventsQueueBuffer = 100
4257
)
@@ -71,7 +86,7 @@ type Migrator struct {
7186
// copyRowsQueue should not be buffered; if buffered some non-damaging but
7287
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
7388
copyRowsQueue chan tableWriteFunc
74-
applyEventsQueue chan tableWriteFunc
89+
applyEventsQueue chan *applyEventStruct
7590

7691
handledChangelogStates map[string]bool
7792
}
@@ -86,7 +101,7 @@ func NewMigrator() *Migrator {
86101
allEventsUpToLockProcessed: make(chan string),
87102

88103
copyRowsQueue: make(chan tableWriteFunc),
89-
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
104+
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
90105
handledChangelogStates: make(map[string]bool),
91106
}
92107
return migrator
@@ -194,7 +209,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
194209
}
195210
case AllEventsUpToLockProcessed:
196211
{
197-
applyEventFunc := func() error {
212+
var applyEventFunc tableWriteFunc = func() error {
198213
this.allEventsUpToLockProcessed <- changelogStateString
199214
return nil
200215
}
@@ -204,7 +219,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
204219
// So as not to create a potential deadlock, we write this func to applyEventsQueue
205220
// asynchronously, understanding it doesn't really matter.
206221
go func() {
207-
this.applyEventsQueue <- applyEventFunc
222+
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
208223
}()
209224
}
210225
default:
@@ -912,11 +927,7 @@ func (this *Migrator) addDMLEventsListener() error {
912927
this.migrationContext.DatabaseName,
913928
this.migrationContext.OriginalTableName,
914929
func(dmlEvent *binlog.BinlogDMLEvent) error {
915-
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
916-
applyEventFunc := func() error {
917-
return this.applier.ApplyDMLEventQuery(dmlEvent)
918-
}
919-
this.applyEventsQueue <- applyEventFunc
930+
this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent)
920931
return nil
921932
},
922933
)
@@ -1013,6 +1024,55 @@ func (this *Migrator) iterateChunks() error {
10131024
return nil
10141025
}
10151026

1027+
func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1028+
handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
1029+
if eventStruct.writeFunc != nil {
1030+
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
1031+
return log.Errore(err)
1032+
}
1033+
}
1034+
return nil
1035+
}
1036+
if eventStruct.dmlEvent == nil {
1037+
return handleNonDMLEventStruct(eventStruct)
1038+
}
1039+
if eventStruct.dmlEvent != nil {
1040+
dmlEvents := [](*binlog.BinlogDMLEvent){}
1041+
dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
1042+
var nonDmlStructToApply *applyEventStruct
1043+
1044+
availableEvents := len(this.applyEventsQueue)
1045+
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
1046+
if availableEvents > batchSize {
1047+
availableEvents = batchSize
1048+
}
1049+
for i := 0; i < availableEvents; i++ {
1050+
additionalStruct := <-this.applyEventsQueue
1051+
if additionalStruct.dmlEvent == nil {
1052+
// Not a DML. We don't group this, and we don't batch any further
1053+
nonDmlStructToApply = additionalStruct
1054+
break
1055+
}
1056+
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
1057+
}
1058+
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1059+
var applyEventFunc tableWriteFunc = func() error {
1060+
return this.applier.ApplyDMLEventQueries(dmlEvents)
1061+
}
1062+
if err := this.retryOperation(applyEventFunc); err != nil {
1063+
return log.Errore(err)
1064+
}
1065+
if nonDmlStructToApply != nil {
1066+
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
1067+
// We need to handle it!
1068+
if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil {
1069+
return log.Errore(err)
1070+
}
1071+
}
1072+
}
1073+
return nil
1074+
}
1075+
10161076
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
10171077
// This is where the ghost table gets the data. The function fills the data single-threaded.
10181078
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
@@ -1027,10 +1087,10 @@ func (this *Migrator) executeWriteFuncs() error {
10271087
// We give higher priority to event processing, then secondary priority to
10281088
// rowcopy
10291089
select {
1030-
case applyEventFunc := <-this.applyEventsQueue:
1090+
case eventStruct := <-this.applyEventsQueue:
10311091
{
1032-
if err := this.retryOperation(applyEventFunc); err != nil {
1033-
return log.Errore(err)
1092+
if err := this.onApplyEventStruct(eventStruct); err != nil {
1093+
return err
10341094
}
10351095
}
10361096
default:

0 commit comments

Comments
 (0)