Skip to content

Commit 48ce087

Browse files
committed
Store lastHeartbeatOnChangelogTime instead of CurrentHeartbeatLag
1 parent 4efd156 commit 48ce087

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

go/base/context.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ type MigrationContext struct {
178178
RenameTablesEndTime time.Time
179179
pointOfInterestTime time.Time
180180
pointOfInterestTimeMutex *sync.Mutex
181-
CurrentHeartbeatLag int64
181+
lastHeartbeatOnChangelogTime time.Time
182+
lastHeartbeatOnChangelogMutex *sync.Mutex
182183
CurrentLag int64
183184
currentProgress uint64
184185
ThrottleHTTPStatusCode int64
@@ -272,6 +273,7 @@ func NewMigrationContext() *MigrationContext {
272273
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
273274
configMutex: &sync.Mutex{},
274275
pointOfInterestTimeMutex: &sync.Mutex{},
276+
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
275277
ColumnRenameMap: make(map[string]string),
276278
PanicAbort: make(chan error),
277279
Log: NewDefaultLogger(),
@@ -455,8 +457,8 @@ func (this *MigrationContext) MarkRowCopyEndTime() {
455457
this.RowCopyEndTime = time.Now()
456458
}
457459

458-
func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration {
459-
return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag))
460+
func (this *MigrationContext) TimeSinceLastHeartbeatOnChangelog() time.Duration {
461+
return time.Since(this.GetLastHeartbeatOnChangelogTime())
460462
}
461463

462464
func (this *MigrationContext) GetCurrentLagDuration() time.Duration {
@@ -498,6 +500,20 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
498500
return time.Since(this.pointOfInterestTime)
499501
}
500502

503+
func (this *MigrationContext) SetLastHeartbeatOnChangelogTime(t time.Time) {
504+
this.lastHeartbeatOnChangelogMutex.Lock()
505+
defer this.lastHeartbeatOnChangelogMutex.Unlock()
506+
507+
this.lastHeartbeatOnChangelogTime = t
508+
}
509+
510+
func (this *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time {
511+
this.lastHeartbeatOnChangelogMutex.Lock()
512+
defer this.lastHeartbeatOnChangelogMutex.Unlock()
513+
514+
return this.lastHeartbeatOnChangelogTime
515+
}
516+
501517
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
502518
if heartbeatIntervalMilliseconds < 100 {
503519
heartbeatIntervalMilliseconds = 100

go/logic/hooks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
6464
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
6565
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
6666
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
67-
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds()))
67+
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
6868
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
6969
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
7070
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))

go/logic/migrator.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
255255

256256
func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
257257
changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3)
258-
if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil {
258+
259+
heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString)
260+
if err != nil {
259261
return this.migrationContext.Log.Errore(err)
260262
} else {
261-
atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag))
263+
this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime)
262264
return nil
263265
}
264266
}
@@ -521,10 +523,10 @@ func (this *Migrator) cutOver() (err error) {
521523
this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed")
522524
this.sleepWhileTrue(
523525
func() (bool, error) {
524-
currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag)
526+
heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog()
525527
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
526-
if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
527-
this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds())
528+
if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
529+
this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds())
528530
return true, nil
529531
} else {
530532
return false, nil
@@ -1003,7 +1005,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10031005
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
10041006
currentBinlogCoordinates,
10051007
this.migrationContext.GetCurrentLagDuration().Seconds(),
1006-
this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(),
1008+
this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
10071009
state,
10081010
eta,
10091011
)

0 commit comments

Comments
 (0)