Skip to content

Commit 8405b0c

Browse files
author
Shlomi Noach
authored
Merge branch 'master' into close-sync-with-err
2 parents 7a3912d + c1c31fb commit 8405b0c

File tree

7 files changed

+82
-5
lines changed

7 files changed

+82
-5
lines changed

doc/command-line-flags.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ See `approve-renamed-columns`
131131

132132
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md)
133133

134+
### throttle-control-replicas
135+
136+
Provide a command delimited list of replicas; `gh-ost` will throttle when any of the given replicas lag beyond `--max-lag-millis`. The list can be queried and updated dynamically via [interactive commands](interactive-commands.md)
137+
138+
### throttle-http
139+
140+
Provide a HTTP endpoint; `gh-ost` will issue `HEAD` requests on given URL and throttle whenever response status code is not `200`. The URL can be queried and updated dynamically via [interactive commands](interactive-commands.md). Empty URL disables the HTTP check.
141+
134142
### timestamp-old-table
135143

136144
Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`.

doc/interactive-commands.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
3131
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
3232
- `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime
3333
- value of `2` will effectively triple the runtime; etc.
34+
- `throttle-http`: change throttle HTTP endpoint
3435
- `throttle-query`: change throttle query
3536
- `throttle-control-replicas='replica1,replica2'`: change list of throttle-control replicas, these are replicas `gh-ost` will check. This takes a comma separated list of replica's to check and replaces the previous list.
3637
- `throttle`: force migration suspend

go/base/context.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ const (
4444
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
4545
)
4646

47+
const (
48+
HTTPStatusOK = 200
49+
)
50+
4751
var (
4852
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
4953
)
@@ -99,6 +103,7 @@ type MigrationContext struct {
99103
ThrottleFlagFile string
100104
ThrottleAdditionalFlagFile string
101105
throttleQuery string
106+
throttleHTTP string
102107
ThrottleCommandedByUser int64
103108
maxLoad LoadMap
104109
criticalLoad LoadMap
@@ -148,6 +153,7 @@ type MigrationContext struct {
148153
pointOfInterestTime time.Time
149154
pointOfInterestTimeMutex *sync.Mutex
150155
CurrentLag int64
156+
ThrottleHTTPStatusCode int64
151157
controlReplicasLagResult mysql.ReplicationLagResult
152158
TotalRowsCopied int64
153159
TotalDMLEventsApplied int64
@@ -157,6 +163,7 @@ type MigrationContext struct {
157163
throttleReasonHint ThrottleReasonHint
158164
throttleGeneralCheckResult ThrottleCheckResult
159165
throttleMutex *sync.Mutex
166+
throttleHTTPMutex *sync.Mutex
160167
IsPostponingCutOver int64
161168
CountingRowsFlag int64
162169
AllEventsUpToLockProcessedInjectedFlag int64
@@ -215,6 +222,7 @@ func newMigrationContext() *MigrationContext {
215222
maxLoad: NewLoadMap(),
216223
criticalLoad: NewLoadMap(),
217224
throttleMutex: &sync.Mutex{},
225+
throttleHTTPMutex: &sync.Mutex{},
218226
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
219227
configMutex: &sync.Mutex{},
220228
pointOfInterestTimeMutex: &sync.Mutex{},
@@ -472,12 +480,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
472480
}
473481

474482
func (this *MigrationContext) GetThrottleQuery() string {
475-
var query string
476-
477483
this.throttleMutex.Lock()
478484
defer this.throttleMutex.Unlock()
479485

480-
query = this.throttleQuery
486+
var query = this.throttleQuery
481487
return query
482488
}
483489

@@ -488,6 +494,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) {
488494
this.throttleQuery = newQuery
489495
}
490496

497+
func (this *MigrationContext) GetThrottleHTTP() string {
498+
this.throttleHTTPMutex.Lock()
499+
defer this.throttleHTTPMutex.Unlock()
500+
501+
var throttleHTTP = this.throttleHTTP
502+
return throttleHTTP
503+
}
504+
505+
func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) {
506+
this.throttleHTTPMutex.Lock()
507+
defer this.throttleHTTPMutex.Unlock()
508+
509+
this.throttleHTTP = throttleHTTP
510+
}
511+
491512
func (this *MigrationContext) GetMaxLoad() LoadMap {
492513
this.throttleMutex.Lock()
493514
defer this.throttleMutex.Unlock()

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func main() {
9393
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
9494
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
9595
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
96+
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
9697
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
9798
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
9899
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
@@ -228,6 +229,7 @@ func main() {
228229
migrationContext.SetDMLBatchSize(*dmlBatchSize)
229230
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
230231
migrationContext.SetThrottleQuery(*throttleQuery)
232+
migrationContext.SetThrottleHTTP(*throttleHTTP)
231233
migrationContext.SetDefaultNumRetries(*defaultRetries)
232234
migrationContext.ApplyCredentials()
233235
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {

go/logic/migrator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func NewMigrator() *Migrator {
9696
migrationContext: base.GetMigrationContext(),
9797
parser: sql.NewParser(),
9898
ghostTableMigrated: make(chan bool),
99-
firstThrottlingCollected: make(chan bool, 1),
99+
firstThrottlingCollected: make(chan bool, 3),
100100
rowCopyComplete: make(chan bool),
101101
allEventsUpToLockProcessed: make(chan string),
102102

@@ -977,7 +977,8 @@ func (this *Migrator) initiateThrottler() error {
977977
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
978978
log.Infof("Waiting for first throttle metrics to be collected")
979979
<-this.firstThrottlingCollected // replication lag
980-
<-this.firstThrottlingCollected // other metrics
980+
<-this.firstThrottlingCollected // HTTP status
981+
<-this.firstThrottlingCollected // other, general metrics
981982
log.Infof("First throttle metrics collected")
982983
go this.throttler.initiateThrottlerChecks()
983984

go/logic/server.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ max-lag-millis=<max-lag> # Set a new replication lag threshold
146146
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
147147
max-load=<load> # Set a new set of max-load thresholds
148148
throttle-query=<query> # Set a new throttle-query (no quotes)
149+
throttle-http=<URL> # Set a new throttle URL
149150
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
150151
throttle # Force throttling
151152
no-throttle # End forced throttling (other throttling may still apply)
@@ -236,6 +237,16 @@ help # This message
236237
fmt.Fprintf(writer, throttleHint)
237238
return ForcePrintStatusAndHintRule, nil
238239
}
240+
case "throttle-http":
241+
{
242+
if argIsQuestion {
243+
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP())
244+
return NoPrintStatusRule, nil
245+
}
246+
this.migrationContext.SetThrottleHTTP(arg)
247+
fmt.Fprintf(writer, throttleHint)
248+
return ForcePrintStatusAndHintRule, nil
249+
}
239250
case "throttle-control-replicas":
240251
{
241252
if argIsQuestion {

go/logic/throttler.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package logic
77

88
import (
99
"fmt"
10+
"net/http"
1011
"sync/atomic"
1112
"time"
1213

@@ -41,6 +42,11 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
4142
if generalCheckResult.ShouldThrottle {
4243
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
4344
}
45+
// HTTP throttle
46+
statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode)
47+
if statusCode != 0 && statusCode != http.StatusOK {
48+
return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint
49+
}
4450
// Replication lag throttle
4551
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
4652
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
@@ -213,6 +219,32 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
213219
return false, variableName, value, threshold, nil
214220
}
215221

222+
// collectReplicationLag reads the latest changelog heartbeat value
223+
func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) {
224+
collectFunc := func() (sleep bool, err error) {
225+
url := this.migrationContext.GetThrottleHTTP()
226+
if url == "" {
227+
return true, nil
228+
}
229+
resp, err := http.Head(url)
230+
if err != nil {
231+
return false, err
232+
}
233+
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
234+
return false, nil
235+
}
236+
237+
collectFunc()
238+
firstThrottlingCollected <- true
239+
240+
ticker := time.Tick(100 * time.Millisecond)
241+
for range ticker {
242+
if sleep, _ := collectFunc(); sleep {
243+
time.Sleep(1 * time.Second)
244+
}
245+
}
246+
}
247+
216248
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
217249
func (this *Throttler) collectGeneralThrottleMetrics() error {
218250

@@ -290,6 +322,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
290322
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
291323
go this.collectReplicationLag(firstThrottlingCollected)
292324
go this.collectControlReplicasLag()
325+
go this.collectThrottleHTTPStatus(firstThrottlingCollected)
293326

294327
go func() {
295328
this.collectGeneralThrottleMetrics()

0 commit comments

Comments
 (0)