@@ -55,9 +55,11 @@ type Migrator struct {
55
55
applier * Applier
56
56
eventsStreamer * EventsStreamer
57
57
server * Server
58
+ throttler * Throttler
58
59
hooksExecutor * HooksExecutor
59
60
migrationContext * base.MigrationContext
60
61
62
+ firstThrottlingCollected chan bool
61
63
tablesInPlace chan bool
62
64
rowCopyComplete chan bool
63
65
allEventsUpToLockProcessed chan bool
@@ -81,6 +83,7 @@ func NewMigrator() *Migrator {
81
83
migrationContext : base .GetMigrationContext (),
82
84
parser : sql .NewParser (),
83
85
tablesInPlace : make (chan bool ),
86
+ firstThrottlingCollected : make (chan bool , 1 ),
84
87
rowCopyComplete : make (chan bool ),
85
88
allEventsUpToLockProcessed : make (chan bool ),
86
89
panicAbort : make (chan error ),
@@ -119,8 +122,44 @@ func (this *Migrator) initiateHooksExecutor() (err error) {
119
122
}
120
123
121
124
// shouldThrottle performs checks to see whether we should currently be throttling.
122
- // It also checks for critical-load and panic aborts.
125
+ // It merely observes the metrics collected by other components, it does not issue
126
+ // its own metric collection.
123
127
func (this * Migrator ) shouldThrottle () (result bool , reason string ) {
128
+ generalCheckResult := this .migrationContext .GetThrottleGeneralCheckResult ()
129
+ if generalCheckResult .ShouldThrottle {
130
+ return generalCheckResult .ShouldThrottle , generalCheckResult .Reason
131
+ }
132
+ // Replication lag throttle
133
+ maxLagMillisecondsThrottleThreshold := atomic .LoadInt64 (& this .migrationContext .MaxLagMillisecondsThrottleThreshold )
134
+ lag := atomic .LoadInt64 (& this .migrationContext .CurrentLag )
135
+ if time .Duration (lag ) > time .Duration (maxLagMillisecondsThrottleThreshold )* time .Millisecond {
136
+ return true , fmt .Sprintf ("lag=%fs" , time .Duration (lag ).Seconds ())
137
+ }
138
+ checkThrottleControlReplicas := true
139
+ if (this .migrationContext .TestOnReplica || this .migrationContext .MigrateOnReplica ) && (atomic .LoadInt64 (& this .allEventsUpToLockProcessedInjectedFlag ) > 0 ) {
140
+ checkThrottleControlReplicas = false
141
+ }
142
+ if checkThrottleControlReplicas {
143
+ lagResult := this .migrationContext .GetControlReplicasLagResult ()
144
+ if lagResult .Err != nil {
145
+ return true , fmt .Sprintf ("%+v %+v" , lagResult .Key , lagResult .Err )
146
+ }
147
+ if lagResult .Lag > time .Duration (maxLagMillisecondsThrottleThreshold )* time .Millisecond {
148
+ return true , fmt .Sprintf ("%+v replica-lag=%fs" , lagResult .Key , lagResult .Lag .Seconds ())
149
+ }
150
+ }
151
+ // Got here? No metrics indicates we need throttling.
152
+ return false , ""
153
+ }
154
+
155
+ // readGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
156
+ func (this * Migrator ) readGeneralThrottleMetrics () error {
157
+
158
+ setThrottle := func (throttle bool , reason string ) error {
159
+ this .migrationContext .SetThrottleGeneralCheckResult (base .NewThrottleCheckResult (throttle , reason ))
160
+ return nil
161
+ }
162
+
124
163
// Regardless of throttle, we take opportunity to check for panic-abort
125
164
if this .migrationContext .PanicFlagFile != "" {
126
165
if base .FileExists (this .migrationContext .PanicFlagFile ) {
@@ -131,7 +170,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
131
170
for variableName , threshold := range criticalLoad {
132
171
value , err := this .applier .ShowStatusVariable (variableName )
133
172
if err != nil {
134
- return true , fmt .Sprintf ("%s %s" , variableName , err )
173
+ return setThrottle ( true , fmt .Sprintf ("%s %s" , variableName , err ) )
135
174
}
136
175
if value >= threshold {
137
176
this .panicAbort <- fmt .Errorf ("critical-load met: %s=%d, >=%d" , variableName , value , threshold )
@@ -142,62 +181,60 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
142
181
143
182
// User-based throttle
144
183
if atomic .LoadInt64 (& this .migrationContext .ThrottleCommandedByUser ) > 0 {
145
- return true , "commanded by user"
184
+ return setThrottle ( true , "commanded by user" )
146
185
}
147
186
if this .migrationContext .ThrottleFlagFile != "" {
148
187
if base .FileExists (this .migrationContext .ThrottleFlagFile ) {
149
188
// Throttle file defined and exists!
150
- return true , "flag-file"
189
+ return setThrottle ( true , "flag-file" )
151
190
}
152
191
}
153
192
if this .migrationContext .ThrottleAdditionalFlagFile != "" {
154
193
if base .FileExists (this .migrationContext .ThrottleAdditionalFlagFile ) {
155
194
// 2nd Throttle file defined and exists!
156
- return true , "flag-file"
157
- }
158
- }
159
- // Replication lag throttle
160
- maxLagMillisecondsThrottleThreshold := atomic .LoadInt64 (& this .migrationContext .MaxLagMillisecondsThrottleThreshold )
161
- lag := atomic .LoadInt64 (& this .migrationContext .CurrentLag )
162
- if time .Duration (lag ) > time .Duration (maxLagMillisecondsThrottleThreshold )* time .Millisecond {
163
- return true , fmt .Sprintf ("lag=%fs" , time .Duration (lag ).Seconds ())
164
- }
165
- checkThrottleControlReplicas := true
166
- if (this .migrationContext .TestOnReplica || this .migrationContext .MigrateOnReplica ) && (atomic .LoadInt64 (& this .allEventsUpToLockProcessedInjectedFlag ) > 0 ) {
167
- checkThrottleControlReplicas = false
168
- }
169
- if checkThrottleControlReplicas {
170
- lagResult := this .migrationContext .GetControlReplicasLagResult ()
171
- if lagResult .Err != nil {
172
- return true , fmt .Sprintf ("%+v %+v" , lagResult .Key , lagResult .Err )
173
- }
174
- if lagResult .Lag > time .Duration (maxLagMillisecondsThrottleThreshold )* time .Millisecond {
175
- return true , fmt .Sprintf ("%+v replica-lag=%fs" , lagResult .Key , lagResult .Lag .Seconds ())
195
+ return setThrottle (true , "flag-file" )
176
196
}
177
197
}
178
198
179
199
maxLoad := this .migrationContext .GetMaxLoad ()
180
200
for variableName , threshold := range maxLoad {
181
201
value , err := this .applier .ShowStatusVariable (variableName )
182
202
if err != nil {
183
- return true , fmt .Sprintf ("%s %s" , variableName , err )
203
+ return setThrottle ( true , fmt .Sprintf ("%s %s" , variableName , err ) )
184
204
}
185
205
if value >= threshold {
186
- return true , fmt .Sprintf ("max-load %s=%d >= %d" , variableName , value , threshold )
206
+ return setThrottle ( true , fmt .Sprintf ("max-load %s=%d >= %d" , variableName , value , threshold ) )
187
207
}
188
208
}
189
209
if this .migrationContext .GetThrottleQuery () != "" {
190
210
if res , _ := this .applier .ExecuteThrottleQuery (); res > 0 {
191
- return true , "throttle-query"
211
+ return setThrottle ( true , "throttle-query" )
192
212
}
193
213
}
194
214
195
- return false , ""
215
+ return setThrottle (false , "" )
216
+ }
217
+
218
+ // initiateThrottlerMetrics initiates the various processes that collect measurements
219
+ // that may affect throttling. There are several components, all running independently,
220
+ // that collect such metrics.
221
+ func (this * Migrator ) initiateThrottlerMetrics () {
222
+ go this .initiateHeartbeatReader ()
223
+ go this .initiateControlReplicasReader ()
224
+
225
+ go func () {
226
+ throttlerMetricsTick := time .Tick (1 * time .Second )
227
+ this .readGeneralThrottleMetrics ()
228
+ this .firstThrottlingCollected <- true
229
+ for range throttlerMetricsTick {
230
+ this .readGeneralThrottleMetrics ()
231
+ }
232
+ }()
196
233
}
197
234
198
235
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
199
236
func (this * Migrator ) initiateThrottler () error {
200
- throttlerTick := time .Tick (1 * time .Second )
237
+ throttlerTick := time .Tick (100 * time .Millisecond )
201
238
202
239
throttlerFunction := func () {
203
240
alreadyThrottling , currentReason := this .migrationContext .IsThrottled ()
@@ -453,16 +490,15 @@ func (this *Migrator) Migrate() (err error) {
453
490
if err := this .countTableRows (); err != nil {
454
491
return err
455
492
}
456
-
457
493
if err := this .addDMLEventsListener (); err != nil {
458
494
return err
459
495
}
460
- go this .initiateHeartbeatReader ()
461
- go this .initiateControlReplicasReader ()
462
-
463
496
if err := this .applier .ReadMigrationRangeValues (); err != nil {
464
497
return err
465
498
}
499
+ go this .initiateThrottlerMetrics ()
500
+ log .Infof ("Waiting for first throttle metrics to be collected" )
501
+ <- this .firstThrottlingCollected
466
502
go this .initiateThrottler ()
467
503
if err := this .hooksExecutor .onBeforeRowCopy (); err != nil {
468
504
return err
@@ -1206,6 +1242,11 @@ func (this *Migrator) addDMLEventsListener() error {
1206
1242
return err
1207
1243
}
1208
1244
1245
+ func (this * Migrator ) initiateThrottler () error {
1246
+ this .throttler = NewThrottler (this .panicAbort )
1247
+ return nil
1248
+ }
1249
+
1209
1250
func (this * Migrator ) initiateApplier () error {
1210
1251
this .applier = NewApplier ()
1211
1252
if err := this .applier .InitDBConnections (); err != nil {
0 commit comments