Skip to content

Commit 2afb86b

Browse files
author
Shlomi Noach
committed
support for millisecond throttling
- `--max-lag-millis` is at least `100ms` - `--heartbeat-interval-millis` introduced; defaults `500ms`, can range `100ms` - `1s` - Control replicas lag calculated asynchronously to throttle test - aggressive when `max-lag-millis < 1000` and when `replication-lag-query` is given
1 parent b74804e commit 2afb86b

File tree

5 files changed

+77
-21
lines changed

5 files changed

+77
-21
lines changed

doc/interactive-commands.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
1818
- `status`: returns a detailed status summary of migration progress and configuration
1919
- `sup`: returns a brief status summary of migration progress
2020
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
21-
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `1000`, i.e. 1 second)
21+
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
2222
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
2323
The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket.
2424
- `critical-load=<load>`: change critical load setting (exceeding given thresholds causes panic and abort)

go/base/context.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type MigrationContext struct {
6464
CliUser string
6565
CliPassword string
6666

67+
HeartbeatIntervalMilliseconds int64
6768
defaultNumRetries int64
6869
ChunkSize int64
6970
niceRatio float64
@@ -111,6 +112,7 @@ type MigrationContext struct {
111112
pointOfInterestTime time.Time
112113
pointOfInterestTimeMutex *sync.Mutex
113114
CurrentLag int64
115+
controlReplicasLagResult mysql.ReplicationLagResult
114116
TotalRowsCopied int64
115117
TotalDMLEventsApplied int64
116118
isThrottled bool
@@ -323,6 +325,16 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
323325
return time.Since(this.pointOfInterestTime)
324326
}
325327

328+
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
329+
if heartbeatIntervalMilliseconds < 100 {
330+
heartbeatIntervalMilliseconds = 100
331+
}
332+
if heartbeatIntervalMilliseconds > 1000 {
333+
heartbeatIntervalMilliseconds = 1000
334+
}
335+
this.HeartbeatIntervalMilliseconds = heartbeatIntervalMilliseconds
336+
}
337+
326338
func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) {
327339
if maxLagMillisecondsThrottleThreshold < 100 {
328340
maxLagMillisecondsThrottleThreshold = 100
@@ -451,6 +463,20 @@ func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
451463
return nil
452464
}
453465

466+
func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLagResult {
467+
this.throttleMutex.Lock()
468+
defer this.throttleMutex.Unlock()
469+
470+
lagResult := this.controlReplicasLagResult
471+
return lagResult
472+
}
473+
474+
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
475+
this.throttleMutex.Lock()
476+
defer this.throttleMutex.Unlock()
477+
this.controlReplicasLagResult = *lagResult
478+
}
479+
454480
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
455481
this.throttleMutex.Lock()
456482
defer this.throttleMutex.Unlock()

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func main() {
8181
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
8282
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
8383
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
84+
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
8485
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
8586
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
8687
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@@ -181,6 +182,7 @@ func main() {
181182
if migrationContext.ServeSocketFile == "" {
182183
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
183184
}
185+
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
184186
migrationContext.SetNiceRatio(*niceRatio)
185187
migrationContext.SetChunkSize(*chunkSize)
186188
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)

go/logic/applier.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
258258

259259
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
260260
// This is done asynchronously
261-
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
261+
func (this *Applier) InitiateHeartbeat() {
262262
var numSuccessiveFailures int64
263263
injectHeartbeat := func() error {
264264
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
@@ -273,10 +273,10 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
273273
}
274274
injectHeartbeat()
275275

276-
heartbeatTick := time.Tick(time.Duration(heartbeatIntervalMilliseconds) * time.Millisecond)
276+
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
277277
for range heartbeatTick {
278278
// Generally speaking, we would issue a goroutine, but I'd actually rather
279-
// have this blocked rather than spam the master in the event something
279+
// have this block the loop rather than spam the master in the event something
280280
// goes wrong
281281
if err := injectHeartbeat(); err != nil {
282282
return

go/logic/migrator.go

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ const (
3636
type tableWriteFunc func() error
3737

3838
const (
39-
applyEventsQueueBuffer = 100
40-
heartbeatIntervalMilliseconds = 1000
39+
applyEventsQueueBuffer = 100
4140
)
4241

4342
type PrintStatusRule int
@@ -159,7 +158,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
159158
checkThrottleControlReplicas = false
160159
}
161160
if checkThrottleControlReplicas {
162-
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
161+
lagResult := this.migrationContext.GetControlReplicasLagResult()
163162
if lagResult.Err != nil {
164163
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
165164
}
@@ -328,8 +327,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
328327
return nil
329328
}
330329

331-
// onChangelogHeartbeat is called when a heartbeat event is intercepted
332-
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
330+
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
331+
func (this *Migrator) parseChangelogHeartbeat(heartbeatValue string) (err error) {
333332
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
334333
if err != nil {
335334
return log.Errore(err)
@@ -427,7 +426,8 @@ func (this *Migrator) Migrate() (err error) {
427426
if err := this.addDMLEventsListener(); err != nil {
428427
return err
429428
}
430-
go this.initiateHeartbeatListener()
429+
go this.initiateHeartbeatReader()
430+
go this.initiateControlReplicasReader()
431431

432432
if err := this.applier.ReadMigrationRangeValues(); err != nil {
433433
return err
@@ -1045,11 +1045,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10451045
fmt.Fprintln(w, status)
10461046
}
10471047

1048-
// initiateHeartbeatListener listens for heartbeat events. gh-ost implements its own
1048+
// initiateHeartbeatReader listens for heartbeat events. gh-ost implements its own
10491049
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
10501050
// Heartbeat is supplied via the changelog table
1051-
func (this *Migrator) initiateHeartbeatListener() {
1052-
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
1051+
func (this *Migrator) initiateHeartbeatReader() {
1052+
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
10531053
for range ticker {
10541054
go func() error {
10551055
if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 {
@@ -1059,19 +1059,47 @@ func (this *Migrator) initiateHeartbeatListener() {
10591059
if err != nil {
10601060
return log.Errore(err)
10611061
}
1062-
for hint, value := range changelogState {
1063-
switch hint {
1064-
case "heartbeat":
1065-
{
1066-
this.onChangelogHeartbeat(value)
1067-
}
1068-
}
1062+
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
1063+
this.parseChangelogHeartbeat(heartbeatValue)
10691064
}
10701065
return nil
10711066
}()
10721067
}
10731068
}
10741069

1070+
// initiateControlReplicasReader
1071+
func (this *Migrator) initiateControlReplicasReader() {
1072+
readControlReplicasLag := func(replicationLagQuery string) error {
1073+
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) {
1074+
return nil
1075+
}
1076+
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), replicationLagQuery)
1077+
this.migrationContext.SetControlReplicasLagResult(lagResult)
1078+
return nil
1079+
}
1080+
aggressiveTicker := time.Tick(100 * time.Millisecond)
1081+
relaxedFactor := 10
1082+
counter := 0
1083+
shouldReadLagAggressively := false
1084+
replicationLagQuery := ""
1085+
1086+
for range aggressiveTicker {
1087+
if counter%relaxedFactor == 0 {
1088+
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
1089+
// do not typically change at all throughout the migration, but nonetheless we check them.
1090+
counter = 0
1091+
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
1092+
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
1093+
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
1094+
}
1095+
if counter == 0 || shouldReadLagAggressively {
1096+
// We check replication lag every so often, or if we wish to be aggressive
1097+
readControlReplicasLag(replicationLagQuery)
1098+
}
1099+
counter++
1100+
}
1101+
}
1102+
10751103
// initiateStreaming begins treaming of binary log events and registers listeners for such events
10761104
func (this *Migrator) initiateStreaming() error {
10771105
this.eventsStreamer = NewEventsStreamer()
@@ -1139,7 +1167,7 @@ func (this *Migrator) initiateApplier() error {
11391167
}
11401168

11411169
this.applier.WriteChangelogState(string(TablesInPlace))
1142-
go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)
1170+
go this.applier.InitiateHeartbeat()
11431171
return nil
11441172
}
11451173

0 commit comments

Comments
 (0)