Skip to content

Commit 2f4d9b8

Browse files
author
Shlomi Noach
authored
Merge pull request #109 from github/replication-lag-query-dynamic
dynamic replication-lag-query
2 parents 5d23b72 + b53ee24 commit 2f4d9b8

File tree

4 files changed

+54
-16
lines changed

4 files changed

+54
-16
lines changed

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
#
33
#
4-
RELEASE_VERSION="1.0.5"
4+
RELEASE_VERSION="1.0.6"
55

66
buildpath=/tmp/gh-ost
77
target=gh-ost

go/base/context.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ type MigrationContext struct {
6565
ChunkSize int64
6666
NiceRatio int64
6767
MaxLagMillisecondsThrottleThreshold int64
68-
ReplictionLagQuery string
69-
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
68+
replicationLagQuery string
69+
throttleControlReplicaKeys *mysql.InstanceKeyMap
7070
ThrottleFlagFile string
7171
ThrottleAdditionalFlagFile string
72-
ThrottleQuery string
72+
throttleQuery string
7373
ThrottleCommandedByUser int64
7474
maxLoad LoadMap
7575
criticalLoad LoadMap
@@ -159,7 +159,7 @@ func newMigrationContext() *MigrationContext {
159159
maxLoad: NewLoadMap(),
160160
criticalLoad: NewLoadMap(),
161161
throttleMutex: &sync.Mutex{},
162-
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
162+
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
163163
configMutex: &sync.Mutex{},
164164
pointOfInterestTimeMutex: &sync.Mutex{},
165165
ColumnRenameMap: make(map[string]string),
@@ -334,21 +334,38 @@ func (this *MigrationContext) IsThrottled() (bool, string) {
334334
return this.isThrottled, this.throttleReason
335335
}
336336

337+
func (this *MigrationContext) GetReplicationLagQuery() string {
338+
var query string
339+
340+
this.throttleMutex.Lock()
341+
defer this.throttleMutex.Unlock()
342+
343+
query = this.replicationLagQuery
344+
return query
345+
}
346+
347+
func (this *MigrationContext) SetReplicationLagQuery(newQuery string) {
348+
this.throttleMutex.Lock()
349+
defer this.throttleMutex.Unlock()
350+
351+
this.replicationLagQuery = newQuery
352+
}
353+
337354
func (this *MigrationContext) GetThrottleQuery() string {
338355
var query string
339356

340357
this.throttleMutex.Lock()
341358
defer this.throttleMutex.Unlock()
342359

343-
query = this.ThrottleQuery
360+
query = this.throttleQuery
344361
return query
345362
}
346363

347364
func (this *MigrationContext) SetThrottleQuery(newQuery string) {
348365
this.throttleMutex.Lock()
349366
defer this.throttleMutex.Unlock()
350367

351-
this.ThrottleQuery = newQuery
368+
this.throttleQuery = newQuery
352369
}
353370

354371
func (this *MigrationContext) GetMaxLoad() LoadMap {
@@ -400,7 +417,7 @@ func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKey
400417
defer this.throttleMutex.Unlock()
401418

402419
keys := mysql.NewInstanceKeyMap()
403-
keys.AddKeys(this.ThrottleControlReplicaKeys.GetInstanceKeys())
420+
keys.AddKeys(this.throttleControlReplicaKeys.GetInstanceKeys())
404421
return keys
405422
}
406423

@@ -413,7 +430,15 @@ func (this *MigrationContext) ReadThrottleControlReplicaKeys(throttleControlRepl
413430
this.throttleMutex.Lock()
414431
defer this.throttleMutex.Unlock()
415432

416-
this.ThrottleControlReplicaKeys = keys
433+
this.throttleControlReplicaKeys = keys
434+
return nil
435+
}
436+
437+
func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey) error {
438+
this.throttleMutex.Lock()
439+
defer this.throttleMutex.Unlock()
440+
441+
this.throttleControlReplicaKeys.AddKey(key)
417442
return nil
418443
}
419444

go/cmd/gh-ost/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ func main() {
7575
flag.Int64Var(&migrationContext.NiceRatio, "nice-ratio", 0, "force being 'nice', imply sleep time per chunk time. Example values: 0 is aggressive. 3: for every ms spend in a rowcopy chunk, spend 3ms sleeping immediately after")
7676

7777
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
78-
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
78+
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
7979
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
80-
flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
80+
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
8181
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
8282
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
8383
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@@ -171,6 +171,8 @@ func main() {
171171
}
172172
migrationContext.SetChunkSize(*chunkSize)
173173
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
174+
migrationContext.SetReplicationLagQuery(*replicationLagQuery)
175+
migrationContext.SetThrottleQuery(*throttleQuery)
174176
migrationContext.SetDefaultNumRetries(*defaultRetries)
175177
migrationContext.ApplyCredentials()
176178
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {

go/logic/migrator.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
158158
checkThrottleControlReplicas = false
159159
}
160160
if checkThrottleControlReplicas {
161-
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
161+
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
162162
if err != nil {
163163
return true, err.Error()
164164
}
@@ -661,9 +661,10 @@ chunk-size=<newsize> # Set a new chunk-size
661661
nice-ratio=<ratio> # Set a new nice-ratio, integer (0 is agrressive)
662662
critical-load=<load> # Set a new set of max-load thresholds
663663
max-lag-millis=<max-lag> # Set a new replication lag threshold
664+
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
664665
max-load=<load> # Set a new set of max-load thresholds
665-
throttle-query=<query> # Set a new throttle-query
666-
throttle-control-replicas=<replicas> #
666+
throttle-query=<query> # Set a new throttle-query (no quotes)
667+
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
667668
throttle # Force throttling
668669
no-throttle # End forced throttling (other throttling may still apply)
669670
unpostpone # Bail out a cut-over postpone; proceed to cut-over
@@ -693,6 +694,11 @@ help # This message
693694
this.printStatus(ForcePrintStatusAndHint, writer)
694695
}
695696
}
697+
case "replication-lag-query":
698+
{
699+
this.migrationContext.SetReplicationLagQuery(arg)
700+
this.printStatus(ForcePrintStatusAndHint, writer)
701+
}
696702
case "nice-ratio":
697703
{
698704
if niceRatio, err := strconv.Atoi(arg); err != nil {
@@ -809,8 +815,8 @@ func (this *Migrator) initiateInspector() (err error) {
809815
*this.migrationContext.ApplierConnectionConfig.ImpliedKey, *this.migrationContext.InspectorConnectionConfig.ImpliedKey,
810816
)
811817
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
812-
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
813-
this.migrationContext.ThrottleControlReplicaKeys.AddKey(this.migrationContext.InspectorConnectionConfig.Key)
818+
if this.migrationContext.GetThrottleControlReplicaKeys().Len() == 0 {
819+
this.migrationContext.AddThrottleControlReplicaKey(this.migrationContext.InspectorConnectionConfig.Key)
814820
}
815821
} else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster {
816822
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
@@ -860,6 +866,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
860866
criticalLoad.String(),
861867
atomic.LoadInt64(&this.migrationContext.NiceRatio),
862868
))
869+
if replicationLagQuery := this.migrationContext.GetReplicationLagQuery(); replicationLagQuery != "" {
870+
fmt.Fprintln(w, fmt.Sprintf("# Replication lag query: %+v",
871+
replicationLagQuery,
872+
))
873+
}
863874
if this.migrationContext.ThrottleFlagFile != "" {
864875
fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
865876
this.migrationContext.ThrottleFlagFile,

0 commit comments

Comments
 (0)