Skip to content

Commit e17f41b

Browse files
author
Shlomi Noach
authored
Merge pull request #347 from github/heartbeat-control-replicas-unified
Reading replication lag via _changelog_ table, also on control replicas
2 parents 6bcf515 + 914f692 commit e17f41b

File tree

10 files changed

+93
-123
lines changed

10 files changed

+93
-123
lines changed

doc/command-line-flags.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,7 @@ On a replication topology, this is perhaps the most important migration throttli
100100

101101
When using [Connect to replica, migrate on master](cheatsheet.md), this lag is primarily tested on the very replica `gh-ost` operates on. Lag is measured by checking the heartbeat events injected by `gh-ost` itself on the utility changelog table. That is, to measure this replica's lag, `gh-ost` doesn't need to issue `show slave status` nor have any external heartbeat mechanism.
102102

103-
When `--throttle-control-replicas` is provided, throttling also considers lag on specified hosts. Measuring lag on these hosts works as follows:
104-
105-
- If `--replication-lag-query` is provided, use the query, trust its result to indicate lag seconds (fraction, i.e. float, allowed)
106-
- Otherwise, issue `show slave status` and read `Seconds_behind_master` (`1sec` granularity)
103+
When `--throttle-control-replicas` is provided, throttling also considers lag on specified hosts. Lag measurements on listed hosts is done by querying `gh-ost`'s _changelog_ table, where `gh-ost` injects a heartbeat.
107104

108105
See also: [Sub-second replication lag throttling](subsecond-lag.md)
109106

doc/subsecond-lag.md

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,18 @@
22

33
`gh-ost` is able to utilize sub-second replication lag measurements.
44

5-
At GitHub, small replication lag is crucial, and we like to keep it below `1s` at all times. If you have similar concern, we strongly urge you to proceed to implement sub-second lag throttling.
5+
At GitHub, small replication lag is crucial, and we like to keep it below `1s` at all times.
66

77
`gh-ost` will do sub-second throttling when `--max-lag-millis` is smaller than `1000`, i.e. smaller than `1sec`.
88
Replication lag is measured on:
99

1010
- The "inspected" server (the server `gh-ost` connects to; replica is desired but not mandatory)
1111
- The `throttle-control-replicas` list
1212

13-
For the inspected server, `gh-ost` uses an internal heartbeat mechanism. It injects heartbeat events onto the utility changelog table, then reads those events in the binary log, and compares times. This measurement is by default and by definition sub-second enabled.
13+
In both cases, `gh-ost` uses an internal heartbeat mechanism. It injects heartbeat events onto the utility changelog table, then reads those entries on replicas, and compares times. This measurement is on by default and by definition supports sub-second resolution.
1414

1515
You can explicitly define how frequently will `gh-ost` inject heartbeat events, via `heartbeat-interval-millis`. You should set `heartbeat-interval-millis <= max-lag-millis`. It still works if not, but loses granularity and effect.
1616

17-
On the `throttle-control-replicas`, `gh-ost` only issues SQL queries, and does not attempt to read the binary log stream. Perhaps those other replicas don't have binary logs in the first place.
17+
In earlier versions, the `--throttle-control-replicas` list was subjected to `1` second resolution or to 3rd party heartbeat injections such as `pt-heartbeat`. This is no longer the case. The argument `--replication-lag-query` has been deprecated and is no longer needed.
1818

19-
The standard way of getting replication lag on a replica is to issue `SHOW SLAVE STATUS`, then reading `Seconds_behind_master` value. But that value has a `1sec` granularity.
20-
21-
To be able to throttle on your production replicas fleet when replication lag exceeds a sub-second threshold, you must provide with a `replication-lag-query` that returns a sub-second resolution lag.
22-
23-
As a common example, many use [pt-heartbeat](https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html) to inject heartbeat events on the master. You would issue something like:
24-
25-
/usr/bin/pt-heartbeat -- -D your_schema --create-table --update --replace --interval=0.1 --daemonize --pid ...
26-
27-
Note `--interval=0.1` to indicate `10` heartbeats per second.
28-
29-
You would then provide
30-
31-
gh-ost ... --replication-lag-query="select unix_timestamp(now(6)) - unix_timestamp(ts) as ghost_lag_check from your_schema.heartbeat order by ts desc limit 1"
32-
33-
Our production migrations use sub-second lag throttling and are able to keep our entire fleet of replicas well below `1sec` lag.
19+
Our production migrations use sub-second lag throttling and are able to keep our entire fleet of replicas well below `1sec` lag. We use `--heartbeat-interval-millis=100` on our production migrations with a `--max-lag-millis` value of between `300` and `500`.

doc/throttle.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,7 @@ Otherwise you may specify your own list of replica servers you wish it to observ
2828

2929
- `--max-lag-millis`: maximum allowed lag; any controlled replica lagging more than this value will cause throttling to kick in. When all control replicas have smaller lag than indicated, operation resumes.
3030

31-
- `--replication-lag-query`: `gh-ost` will, by default, issue a `show slave status` query to find replication lag. However, this is a notoriously flaky value. If you're using your own `heartbeat` mechanism, e.g. via [`pt-heartbeat`](https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html), you may provide your own custom query to return a single decimal (floating point) value indicating replication lag.
32-
33-
Example: `--replication-lag-query="SELECT UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts)) AS lag FROM mydb.heartbeat"`
34-
35-
We encourage you to use [sub-second replication lag throttling](subsecond-lag.md). Your query may then look like:
36-
37-
`--replication-lag-query="SELECT UNIX_TIMESTAMP(6) - MAX(UNIX_TIMESTAMP(ts)) AS lag FROM mydb.heartbeat"`
38-
39-
Note that you may dynamically change both `replication-lag-query` and the `throttle-control-replicas` list via [interactive commands](interactive-commands.md)
31+
Note that you may dynamically change both `--max-lag-millis` and the `throttle-control-replicas` list via [interactive commands](interactive-commands.md)
4032

4133
#### Status thresholds
4234

go/base/context.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ type MigrationContext struct {
9595
ChunkSize int64
9696
niceRatio float64
9797
MaxLagMillisecondsThrottleThreshold int64
98-
replicationLagQuery string
9998
throttleControlReplicaKeys *mysql.InstanceKeyMap
10099
ThrottleFlagFile string
101100
ThrottleAdditionalFlagFile string
@@ -456,23 +455,6 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
456455
return this.isThrottled, this.throttleReason, this.throttleReasonHint
457456
}
458457

459-
func (this *MigrationContext) GetReplicationLagQuery() string {
460-
var query string
461-
462-
this.throttleMutex.Lock()
463-
defer this.throttleMutex.Unlock()
464-
465-
query = this.replicationLagQuery
466-
return query
467-
}
468-
469-
func (this *MigrationContext) SetReplicationLagQuery(newQuery string) {
470-
this.throttleMutex.Lock()
471-
defer this.throttleMutex.Unlock()
472-
473-
this.replicationLagQuery = newQuery
474-
}
475-
476458
func (this *MigrationContext) GetThrottleQuery() string {
477459
var query string
478460

go/cmd/gh-ost/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ func main() {
8888
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
8989

9090
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
91-
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
91+
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
9292
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
9393
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
94-
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
94+
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
9595
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
9696
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
9797
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@@ -185,6 +185,9 @@ func main() {
185185
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
186186
log.Fatalf("--master-password requires --assume-master-host")
187187
}
188+
if *replicationLagQuery != "" {
189+
log.Warningf("--replication-lag-query is deprecated")
190+
}
188191

189192
switch *cutOver {
190193
case "atomic", "default", "":
@@ -221,7 +224,6 @@ func main() {
221224
migrationContext.SetNiceRatio(*niceRatio)
222225
migrationContext.SetChunkSize(*chunkSize)
223226
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
224-
migrationContext.SetReplicationLagQuery(*replicationLagQuery)
225227
migrationContext.SetThrottleQuery(*throttleQuery)
226228
migrationContext.SetDefaultNumRetries(*defaultRetries)
227229
migrationContext.ApplyCredentials()

go/logic/inspect.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
680680
}
681681

682682
// readChangelogState reads changelog hints
683-
func (this *Inspector) readChangelogState() (map[string]string, error) {
683+
func (this *Inspector) readChangelogState(hint string) (string, error) {
684684
query := fmt.Sprintf(`
685-
select hint, value from %s.%s where id <= 255
685+
select hint, value from %s.%s where hint = ? and id <= 255
686686
`,
687687
sql.EscapeName(this.migrationContext.DatabaseName),
688688
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
689689
)
690-
result := make(map[string]string)
690+
result := ""
691691
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
692-
result[m.GetString("hint")] = m.GetString("value")
692+
result = m.GetString("value")
693693
return nil
694-
})
694+
}, hint)
695695
return result, err
696696
}
697697

@@ -702,10 +702,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
702702
}
703703

704704
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
705-
replicationLagQuery := this.migrationContext.GetReplicationLagQuery()
706705
replicationLag, err = mysql.GetReplicationLag(
707706
this.migrationContext.InspectorConnectionConfig,
708-
replicationLagQuery,
709707
)
710708
return replicationLag, err
711709
}

go/logic/migrator.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -723,11 +723,6 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
723723
criticalLoad.String(),
724724
this.migrationContext.GetNiceRatio(),
725725
))
726-
if replicationLagQuery := this.migrationContext.GetReplicationLagQuery(); replicationLagQuery != "" {
727-
fmt.Fprintln(w, fmt.Sprintf("# replication-lag-query: %+v",
728-
replicationLagQuery,
729-
))
730-
}
731726
if this.migrationContext.ThrottleFlagFile != "" {
732727
setIndicator := ""
733728
if base.FileExists(this.migrationContext.ThrottleFlagFile) {

go/logic/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,7 @@ help # This message
178178
}
179179
case "replication-lag-query":
180180
{
181-
this.migrationContext.SetReplicationLagQuery(arg)
182-
return ForcePrintStatusAndHintRule, nil
181+
return NoPrintStatusRule, fmt.Errorf("replication-lag-query is deprecated. gh-ost uses an internal, subsecond resolution query")
183182
}
184183
case "nice-ratio":
185184
{

go/logic/throttler.go

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212

1313
"github.com/github/gh-ost/go/base"
1414
"github.com/github/gh-ost/go/mysql"
15+
"github.com/github/gh-ost/go/sql"
1516
"github.com/outbrain/golib/log"
17+
"github.com/outbrain/golib/sqlutils"
1618
)
1719

1820
// Throttler collects metrics related to throttling and makes informed decisison
@@ -62,15 +64,24 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
6264
return false, "", base.NoThrottleReasonHint
6365
}
6466

65-
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
66-
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
67+
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
68+
func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err error) {
6769
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
6870
if err != nil {
71+
return lag, err
72+
}
73+
lag = time.Since(heartbeatTime)
74+
return lag, nil
75+
}
76+
77+
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
78+
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
79+
if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil {
6980
return log.Errore(err)
81+
} else {
82+
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
83+
return nil
7084
}
71-
lag := time.Since(heartbeatTime)
72-
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
73-
return nil
7485
}
7586

7687
// collectHeartbeat reads the latest changelog heartbeat value
@@ -81,11 +92,9 @@ func (this *Throttler) collectHeartbeat() {
8192
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
8293
return nil
8394
}
84-
changelogState, err := this.inspector.readChangelogState()
85-
if err != nil {
95+
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
8696
return log.Errore(err)
87-
}
88-
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
97+
} else {
8998
this.parseChangelogHeartbeat(heartbeatValue)
9099
}
91100
return nil
@@ -95,36 +104,80 @@ func (this *Throttler) collectHeartbeat() {
95104

96105
// collectControlReplicasLag polls all the control replicas to get maximum lag value
97106
func (this *Throttler) collectControlReplicasLag() {
98-
readControlReplicasLag := func(replicationLagQuery string) error {
107+
108+
replicationLagQuery := fmt.Sprintf(`
109+
select value from %s.%s where hint = 'heartbeat' and id <= 255
110+
`,
111+
sql.EscapeName(this.migrationContext.DatabaseName),
112+
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
113+
)
114+
115+
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
116+
dbUri := connectionConfig.GetDBUri("information_schema")
117+
var heartbeatValue string
118+
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
119+
return lag, err
120+
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
121+
return lag, err
122+
}
123+
lag, err = parseChangelogHeartbeat(heartbeatValue)
124+
return lag, err
125+
}
126+
127+
readControlReplicasLag := func() (result *mysql.ReplicationLagResult) {
128+
instanceKeyMap := this.migrationContext.GetThrottleControlReplicaKeys()
129+
if instanceKeyMap.Len() == 0 {
130+
return result
131+
}
132+
lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len())
133+
for replicaKey := range *instanceKeyMap {
134+
connectionConfig := this.migrationContext.InspectorConnectionConfig.Duplicate()
135+
connectionConfig.Key = replicaKey
136+
137+
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
138+
go func() {
139+
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
140+
lagResults <- lagResult
141+
}()
142+
}
143+
for range *instanceKeyMap {
144+
lagResult := <-lagResults
145+
if result == nil {
146+
result = lagResult
147+
} else if lagResult.Err != nil {
148+
result = lagResult
149+
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
150+
result = lagResult
151+
}
152+
}
153+
return result
154+
}
155+
156+
checkControlReplicasLag := func() {
99157
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
100-
return nil
158+
// No need to read lag
159+
return
160+
}
161+
if result := readControlReplicasLag(); result != nil {
162+
this.migrationContext.SetControlReplicasLagResult(result)
101163
}
102-
lagResult := mysql.GetMaxReplicationLag(
103-
this.migrationContext.InspectorConnectionConfig,
104-
this.migrationContext.GetThrottleControlReplicaKeys(),
105-
replicationLagQuery,
106-
)
107-
this.migrationContext.SetControlReplicasLagResult(lagResult)
108-
return nil
109164
}
110165
aggressiveTicker := time.Tick(100 * time.Millisecond)
111166
relaxedFactor := 10
112167
counter := 0
113168
shouldReadLagAggressively := false
114-
replicationLagQuery := ""
115169

116170
for range aggressiveTicker {
117171
if counter%relaxedFactor == 0 {
118172
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
119173
// do not typically change at all throughout the migration, but nonetheless we check them.
120174
counter = 0
121175
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
122-
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
123-
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
176+
shouldReadLagAggressively = (maxLagMillisecondsThrottleThreshold < 1000)
124177
}
125178
if counter == 0 || shouldReadLagAggressively {
126179
// We check replication lag every so often, or if we wish to be aggressive
127-
readControlReplicasLag(replicationLagQuery)
180+
checkControlReplicasLag()
128181
}
129182
counter++
130183
}

go/mysql/utils.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,13 @@ type ReplicationLagResult struct {
2424

2525
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
2626
// or via SHOW SLAVE STATUS
27-
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
27+
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
2828
dbUri := connectionConfig.GetDBUri("information_schema")
2929
var db *gosql.DB
3030
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
3131
return replicationLag, err
3232
}
3333

34-
if replicationLagQuery != "" {
35-
var floatLag float64
36-
err = db.QueryRow(replicationLagQuery).Scan(&floatLag)
37-
return time.Duration(int64(floatLag*1000)) * time.Millisecond, err
38-
}
39-
// No explicit replication lag query.
4034
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
4135
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
4236
if !secondsBehindMaster.Valid {
@@ -48,34 +42,6 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
4842
return replicationLag, err
4943
}
5044

51-
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
52-
// each via GetReplicationLag
53-
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) {
54-
result = &ReplicationLagResult{Lag: 0}
55-
if instanceKeyMap.Len() == 0 {
56-
return result
57-
}
58-
lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len())
59-
for key := range *instanceKeyMap {
60-
connectionConfig := baseConnectionConfig.Duplicate()
61-
connectionConfig.Key = key
62-
result := &ReplicationLagResult{Key: connectionConfig.Key}
63-
go func() {
64-
result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery)
65-
lagResults <- result
66-
}()
67-
}
68-
for range *instanceKeyMap {
69-
lagResult := <-lagResults
70-
if lagResult.Err != nil {
71-
result = lagResult
72-
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
73-
result = lagResult
74-
}
75-
}
76-
return result
77-
}
78-
7945
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
8046
currentUri := connectionConfig.GetDBUri("information_schema")
8147
db, _, err := sqlutils.GetDB(currentUri)

0 commit comments

Comments
 (0)