Skip to content

Commit 904215e

Browse files
author
Shlomi Noach
authored
Merge pull request #204 from github/reduce-minimum-max-lag
Reduce minimum maxLagMillisecondsThrottleThreshold to 100ms
2 parents f94651f + b2c7193 commit 904215e

File tree

6 files changed

+375
-214
lines changed

6 files changed

+375
-214
lines changed

doc/interactive-commands.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
1818
- `status`: returns a detailed status summary of migration progress and configuration
1919
- `sup`: returns a brief status summary of migration progress
2020
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
21-
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `1000`, i.e. 1 second)
21+
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
2222
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
2323
The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket.
2424
- `critical-load=<load>`: change critical load setting (exceeding given thresholds causes panic and abort)

go/base/context.go

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ var (
4141
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
4242
)
4343

44+
type ThrottleCheckResult struct {
45+
ShouldThrottle bool
46+
Reason string
47+
}
48+
49+
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
50+
return &ThrottleCheckResult{
51+
ShouldThrottle: throttle,
52+
Reason: reason,
53+
}
54+
}
55+
4456
// MigrationContext has the general, global state of migration. It is used by
4557
// all components throughout the migration process.
4658
type MigrationContext struct {
@@ -64,6 +76,7 @@ type MigrationContext struct {
6476
CliUser string
6577
CliPassword string
6678

79+
HeartbeatIntervalMilliseconds int64
6780
defaultNumRetries int64
6881
ChunkSize int64
6982
niceRatio float64
@@ -95,32 +108,36 @@ type MigrationContext struct {
95108
InitiallyDropGhostTable bool
96109
CutOverType CutOver
97110

98-
Hostname string
99-
TableEngine string
100-
RowsEstimate int64
101-
RowsDeltaEstimate int64
102-
UsedRowsEstimateMethod RowsEstimateMethod
103-
HasSuperPrivilege bool
104-
OriginalBinlogFormat string
105-
OriginalBinlogRowImage string
106-
InspectorConnectionConfig *mysql.ConnectionConfig
107-
ApplierConnectionConfig *mysql.ConnectionConfig
108-
StartTime time.Time
109-
RowCopyStartTime time.Time
110-
RowCopyEndTime time.Time
111-
LockTablesStartTime time.Time
112-
RenameTablesStartTime time.Time
113-
RenameTablesEndTime time.Time
114-
pointOfInterestTime time.Time
115-
pointOfInterestTimeMutex *sync.Mutex
116-
CurrentLag int64
117-
TotalRowsCopied int64
118-
TotalDMLEventsApplied int64
119-
isThrottled bool
120-
throttleReason string
121-
throttleMutex *sync.Mutex
122-
IsPostponingCutOver int64
123-
CountingRowsFlag int64
111+
Hostname string
112+
TableEngine string
113+
RowsEstimate int64
114+
RowsDeltaEstimate int64
115+
UsedRowsEstimateMethod RowsEstimateMethod
116+
HasSuperPrivilege bool
117+
OriginalBinlogFormat string
118+
OriginalBinlogRowImage string
119+
InspectorConnectionConfig *mysql.ConnectionConfig
120+
ApplierConnectionConfig *mysql.ConnectionConfig
121+
StartTime time.Time
122+
RowCopyStartTime time.Time
123+
RowCopyEndTime time.Time
124+
LockTablesStartTime time.Time
125+
RenameTablesStartTime time.Time
126+
RenameTablesEndTime time.Time
127+
pointOfInterestTime time.Time
128+
pointOfInterestTimeMutex *sync.Mutex
129+
CurrentLag int64
130+
controlReplicasLagResult mysql.ReplicationLagResult
131+
TotalRowsCopied int64
132+
TotalDMLEventsApplied int64
133+
isThrottled bool
134+
throttleReason string
135+
throttleGeneralCheckResult ThrottleCheckResult
136+
throttleMutex *sync.Mutex
137+
IsPostponingCutOver int64
138+
CountingRowsFlag int64
139+
AllEventsUpToLockProcessedInjectedFlag int64
140+
CleanupImminentFlag int64
124141

125142
OriginalTableColumns *sql.ColumnList
126143
OriginalTableUniqueKeys [](*sql.UniqueKey)
@@ -326,9 +343,19 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
326343
return time.Since(this.pointOfInterestTime)
327344
}
328345

346+
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
347+
if heartbeatIntervalMilliseconds < 100 {
348+
heartbeatIntervalMilliseconds = 100
349+
}
350+
if heartbeatIntervalMilliseconds > 1000 {
351+
heartbeatIntervalMilliseconds = 1000
352+
}
353+
this.HeartbeatIntervalMilliseconds = heartbeatIntervalMilliseconds
354+
}
355+
329356
func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) {
330-
if maxLagMillisecondsThrottleThreshold < 1000 {
331-
maxLagMillisecondsThrottleThreshold = 1000
357+
if maxLagMillisecondsThrottleThreshold < 100 {
358+
maxLagMillisecondsThrottleThreshold = 100
332359
}
333360
atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold)
334361
}
@@ -343,6 +370,20 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
343370
atomic.StoreInt64(&this.ChunkSize, chunkSize)
344371
}
345372

373+
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
374+
this.throttleMutex.Lock()
375+
defer this.throttleMutex.Unlock()
376+
this.throttleGeneralCheckResult = *checkResult
377+
return checkResult
378+
}
379+
380+
func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResult {
381+
this.throttleMutex.Lock()
382+
defer this.throttleMutex.Unlock()
383+
result := this.throttleGeneralCheckResult
384+
return &result
385+
}
386+
346387
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
347388
this.throttleMutex.Lock()
348389
defer this.throttleMutex.Unlock()
@@ -454,6 +495,20 @@ func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
454495
return nil
455496
}
456497

498+
func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLagResult {
499+
this.throttleMutex.Lock()
500+
defer this.throttleMutex.Unlock()
501+
502+
lagResult := this.controlReplicasLagResult
503+
return lagResult
504+
}
505+
506+
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
507+
this.throttleMutex.Lock()
508+
defer this.throttleMutex.Unlock()
509+
this.controlReplicasLagResult = *lagResult
510+
}
511+
457512
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
458513
this.throttleMutex.Lock()
459514
defer this.throttleMutex.Unlock()

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func main() {
8181
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
8282
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
8383
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
84+
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
8485
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
8586
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
8687
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@@ -184,6 +185,7 @@ func main() {
184185
if migrationContext.ServeSocketFile == "" {
185186
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
186187
}
188+
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
187189
migrationContext.SetNiceRatio(*niceRatio)
188190
migrationContext.SetChunkSize(*chunkSize)
189191
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)

go/logic/applier.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
258258

259259
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
260260
// This is done asynchronously
261-
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
261+
func (this *Applier) InitiateHeartbeat() {
262262
var numSuccessiveFailures int64
263263
injectHeartbeat := func() error {
264264
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
@@ -273,10 +273,10 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
273273
}
274274
injectHeartbeat()
275275

276-
heartbeatTick := time.Tick(time.Duration(heartbeatIntervalMilliseconds) * time.Millisecond)
276+
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
277277
for range heartbeatTick {
278278
// Generally speaking, we would issue a goroutine, but I'd actually rather
279-
// have this blocked rather than spam the master in the event something
279+
// have this block the loop rather than spam the master in the event something
280280
// goes wrong
281281
if err := injectHeartbeat(); err != nil {
282282
return

0 commit comments

Comments
 (0)