Skip to content

Commit 3b377f1

Browse files
author
Shlomi Noach
authored
Merge pull request #111 from github/throttle-replicas-more-info
More informative information upon control-replicas lagging
2 parents 2f4d9b8 + e900dae commit 3b377f1

File tree

2 files changed

+28
-20
lines changed

2 files changed

+28
-20
lines changed

go/logic/migrator.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,12 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
158158
checkThrottleControlReplicas = false
159159
}
160160
if checkThrottleControlReplicas {
161-
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
162-
if err != nil {
163-
return true, err.Error()
161+
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
162+
if lagResult.Err != nil {
163+
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
164164
}
165-
if replicationLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
166-
return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds())
165+
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
166+
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
167167
}
168168
}
169169

@@ -217,13 +217,15 @@ func (this *Migrator) initiateThrottler() error {
217217
// calls callback functions, if any
218218
func (this *Migrator) throttle(onThrottled func()) {
219219
for {
220+
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
221+
// Therefore calling IsThrottled() is cheap
220222
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
221223
return
222224
}
223225
if onThrottled != nil {
224226
onThrottled()
225227
}
226-
time.Sleep(time.Second)
228+
time.Sleep(250 * time.Millisecond)
227229
}
228230
}
229231

go/mysql/utils.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ import (
1414
"github.com/outbrain/golib/sqlutils"
1515
)
1616

17+
type ReplicationLagResult struct {
18+
Key InstanceKey
19+
Lag time.Duration
20+
Err error
21+
}
22+
1723
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
1824
// or via SHOW SLAVE STATUS
1925
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
@@ -32,7 +38,7 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
3238
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
3339
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
3440
if !secondsBehindMaster.Valid {
35-
return fmt.Errorf("Replication not running on %+v", connectionConfig.Key)
41+
return fmt.Errorf("replication not running")
3642
}
3743
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
3844
return nil
@@ -42,30 +48,30 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
4248

4349
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
4450
// each via GetReplicationLag
45-
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (replicationLag time.Duration, err error) {
51+
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) {
52+
result = &ReplicationLagResult{Lag: 0}
4653
if instanceKeyMap.Len() == 0 {
47-
return 0, nil
54+
return result
4855
}
49-
lagsChan := make(chan time.Duration, instanceKeyMap.Len())
50-
errorsChan := make(chan error, instanceKeyMap.Len())
56+
lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len())
5157
for key := range *instanceKeyMap {
5258
connectionConfig := baseConnectionConfig.Duplicate()
5359
connectionConfig.Key = key
60+
result := &ReplicationLagResult{Key: connectionConfig.Key}
5461
go func() {
55-
lag, err := GetReplicationLag(connectionConfig, replicationLagQuery)
56-
lagsChan <- lag
57-
errorsChan <- err
62+
result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery)
63+
lagResults <- result
5864
}()
5965
}
6066
for range *instanceKeyMap {
61-
if lagError := <-errorsChan; lagError != nil {
62-
err = lagError
63-
}
64-
if lag := <-lagsChan; lag.Nanoseconds() > replicationLag.Nanoseconds() {
65-
replicationLag = lag
67+
lagResult := <-lagResults
68+
if lagResult.Err != nil {
69+
result = lagResult
70+
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
71+
result = lagResult
6672
}
6773
}
68-
return replicationLag, err
74+
return result
6975
}
7076

7177
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {

0 commit comments

Comments
 (0)