6
6
package binlog
7
7
8
8
import (
9
- "bytes"
10
- "fmt"
11
9
"sync"
12
10
13
11
"github.com/github/gh-ost/go/base"
14
12
"github.com/github/gh-ost/go/mysql"
15
- "github.com/github/gh-ost/go/sql"
16
13
17
14
"time"
18
15
@@ -77,116 +74,13 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
77
74
return & returnCoordinates
78
75
}
79
76
80
- // handleRowsEvents processes a RowEvent from the binlog and sends the DML event to the entriesChannel.
81
- func (this * GoMySQLReader ) handleRowsEvent (ev * replication.BinlogEvent , rowsEvent * replication.RowsEvent , entriesChannel chan <- * BinlogEntry ) error {
82
- if this .currentCoordinates .IsLogPosOverflowBeyond4Bytes (& this .LastAppliedRowsEventHint ) {
83
- return fmt .Errorf ("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes" , this .currentCoordinates )
84
- }
85
-
86
- if this .currentCoordinates .SmallerThanOrEquals (& this .LastAppliedRowsEventHint ) {
87
- this .migrationContext .Log .Debugf ("Skipping handled query at %+v" , this .currentCoordinates )
88
- return nil
89
- }
90
-
91
- dml := ToEventDML (ev .Header .EventType .String ())
92
- if dml == NotDML {
93
- return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
94
- }
95
- for i , row := range rowsEvent .Rows {
96
- if dml == UpdateDML && i % 2 == 1 {
97
- // An update has two rows (WHERE+SET)
98
- // We do both at the same time
99
- continue
100
- }
101
- binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
102
- binlogEntry .DmlEvent = NewBinlogDMLEvent (
103
- string (rowsEvent .Table .Schema ),
104
- string (rowsEvent .Table .Table ),
105
- dml ,
106
- )
107
- switch dml {
108
- case InsertDML :
109
-
110
- {
111
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
112
- }
113
- case UpdateDML :
114
- {
115
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
116
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
117
- }
118
- case DeleteDML :
119
- {
120
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
121
- }
122
- }
123
- // The channel will do the throttling. Whoever is reading from the channel
124
- // decides whether action is taken synchronously (meaning we wait before
125
- // next iteration) or asynchronously (we keep pushing more events)
126
- // In reality, reads will be synchronous
127
- entriesChannel <- binlogEntry
128
- }
129
- this .LastAppliedRowsEventHint = this .currentCoordinates
130
- return nil
131
- }
132
-
133
- // RowsEventToBinlogEntry processes MySQL RowsEvent into our BinlogEntry for later application.
134
- // copied from handleRowEvents
135
- func RowsEventToBinlogEntry (eventType replication.EventType , rowsEvent * replication.RowsEvent , binlogCoords mysql.BinlogCoordinates ) (* BinlogEntry , error ) {
136
- dml := ToEventDML (eventType .String ())
137
- if dml == NotDML {
138
- return nil , fmt .Errorf ("Unknown DML type: %s" , eventType .String ())
139
- }
140
- binlogEntry := NewBinlogEntryAt (binlogCoords )
141
- for i , row := range rowsEvent .Rows {
142
- if dml == UpdateDML && i % 2 == 1 {
143
- // An update has two rows (WHERE+SET)
144
- // We do both at the same time
145
- continue
146
- }
147
- binlogEntry .DmlEvent = NewBinlogDMLEvent (
148
- string (rowsEvent .Table .Schema ),
149
- string (rowsEvent .Table .Table ),
150
- dml ,
151
- )
152
- switch dml {
153
- case InsertDML :
154
- {
155
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
156
- }
157
- case UpdateDML :
158
- {
159
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
160
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
161
- }
162
- case DeleteDML :
163
- {
164
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
165
- }
166
- }
167
- }
168
- return binlogEntry , nil
169
- }
170
-
171
- type Transaction struct {
172
- SequenceNumber int64
173
- LastCommitted int64
174
- Changes chan * BinlogEntry
175
- }
176
-
177
- func (this * GoMySQLReader ) StreamTransactions (ctx context.Context , transactionsChannel chan <- * Transaction ) error {
178
- if err := ctx .Err (); err != nil {
179
- return err
180
- }
181
-
182
- previousSequenceNumber := int64 (0 )
183
-
184
- groups:
77
+ // StreamEvents reads binlog events and sends them to the given channel.
78
+ // It is blocking and should be executed in a goroutine.
79
+ func (this * GoMySQLReader ) StreamEvents (ctx context.Context , eventChannel chan <- * replication.BinlogEvent ) error {
185
80
for {
186
81
if err := ctx .Err (); err != nil {
187
82
return err
188
83
}
189
-
190
84
ev , err := this .binlogStreamer .GetEvent (ctx )
191
85
if err != nil {
192
86
return err
@@ -197,160 +91,8 @@ groups:
197
91
this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
198
92
this .currentCoordinates .EventSize = int64 (ev .Header .EventSize )
199
93
}()
200
-
201
- fmt .Printf ("Event: %s\n " , ev .Header .EventType )
202
-
203
- // Read each event and do something with it
204
- //
205
- // First, ignore all events until we find the next GTID event so that we can start
206
- // at a transaction boundary.
207
- //
208
- // Once we find a GTID event, we can start an event group,
209
- // and then process all events in that group.
210
- // An event group is defined as all events that are part of the same transaction,
211
- // which is defined as all events between the GTID event, a `QueryEvent` containing a `BEGIN` query and ends with
212
- // either a XIDEvent or a `QueryEvent` containing a `COMMIT` or `ROLLBACK` query.
213
- //
214
- // Each group is a struct containing the SequenceNumber, LastCommitted, and a channel of events.
215
- //
216
- // Once the group has ended, we can start looking for the next GTID event.
217
-
218
- var group * Transaction
219
- switch binlogEvent := ev .Event .(type ) {
220
- case * replication.GTIDEvent :
221
- this .migrationContext .Log .Infof ("GTIDEvent: %+v" , binlogEvent )
222
-
223
- // Bail out if we find a gap in the sequence numbers
224
- if previousSequenceNumber != 0 && binlogEvent .SequenceNumber != previousSequenceNumber + 1 {
225
- return fmt .Errorf ("unexpected sequence number: %d, expected %d" , binlogEvent .SequenceNumber , previousSequenceNumber + 1 )
226
- }
227
-
228
- group = & Transaction {
229
- SequenceNumber : binlogEvent .SequenceNumber ,
230
- LastCommitted : binlogEvent .LastCommitted ,
231
- Changes : make (chan * BinlogEntry , 1000 ),
232
- }
233
-
234
- previousSequenceNumber = binlogEvent .SequenceNumber
235
-
236
- // We are good to send the transaction, the transaction events arrive async
237
- this .migrationContext .Log .Infof ("sending transaction: %d %d" , group .SequenceNumber , group .LastCommitted )
238
- transactionsChannel <- group
239
- default :
240
- this .migrationContext .Log .Infof ("Ignoring Event: %+v" , ev .Event )
241
- continue
242
- }
243
-
244
- // Next event should be a query event
245
-
246
- ev , err = this .binlogStreamer .GetEvent (ctx )
247
- if err != nil {
248
- close (group .Changes )
249
- return err
250
- }
251
- this .migrationContext .Log .Infof ("1 - Event: %s" , ev .Header .EventType )
252
-
253
- switch binlogEvent := ev .Event .(type ) {
254
- case * replication.QueryEvent :
255
- if bytes .Equal ([]byte ("BEGIN" ), binlogEvent .Query ) {
256
- this .migrationContext .Log .Infof ("BEGIN for transaction in schema %s" , binlogEvent .Schema )
257
- } else {
258
- this .migrationContext .Log .Infof ("QueryEvent: %+v" , binlogEvent )
259
- this .migrationContext .Log .Infof ("Query: %s" , binlogEvent .Query )
260
-
261
- close (group .Changes )
262
-
263
- // wait for the next event group
264
- continue groups
265
- }
266
- default :
267
- this .migrationContext .Log .Infof ("unexpected Event: %+v" , ev .Event )
268
- close (group .Changes )
269
-
270
- // TODO: handle the group - we want to make sure we process the group's LastCommitted and SequenceNumber
271
-
272
- // wait for the next event group
273
- continue groups
274
- }
275
-
276
- // Next event should be a table map event
277
-
278
- events:
279
- // Now we can start processing the group
280
- for {
281
- ev , err = this .binlogStreamer .GetEvent (ctx )
282
- if err != nil {
283
- close (group .Changes )
284
- return err
285
- }
286
- this .migrationContext .Log .Infof ("3 - Event: %s" , ev .Header .EventType )
287
-
288
- switch binlogEvent := ev .Event .(type ) {
289
- case * replication.TableMapEvent :
290
- this .migrationContext .Log .Infof ("TableMapEvent for %s.%s: %+v" , binlogEvent .Schema , binlogEvent .Table , binlogEvent )
291
- case * replication.RowsEvent :
292
- binlogEntry , err := RowsEventToBinlogEntry (ev .Header .EventType , binlogEvent , this .currentCoordinates )
293
- if err != nil {
294
- close (group .Changes )
295
- return err
296
- }
297
- this .migrationContext .Log .Infof ("RowsEvent: %v" , binlogEvent )
298
- group .Changes <- binlogEntry
299
- this .migrationContext .Log .Infof ("Length of group.Changes: %d" , len (group .Changes ))
300
- case * replication.XIDEvent :
301
- this .migrationContext .Log .Infof ("XIDEvent: %+v" , binlogEvent )
302
- this .migrationContext .Log .Infof ("COMMIT for transaction" )
303
- break events
304
- default :
305
- close (group .Changes )
306
- this .migrationContext .Log .Infof ("unexpected Event: %+v" , ev .Event )
307
- return fmt .Errorf ("unexpected Event: %+v" , ev .Event )
308
- }
309
- }
310
-
311
- close (group .Changes )
312
-
313
- this .migrationContext .Log .Infof ("done processing group - %d events" , len (group .Changes ))
314
- }
315
- }
316
-
317
- // StreamEvents
318
- func (this * GoMySQLReader ) StreamEvents (canStopStreaming func () bool , entriesChannel chan <- * BinlogEntry ) error {
319
- if canStopStreaming () {
320
- return nil
321
- }
322
- for {
323
- if canStopStreaming () {
324
- break
325
- }
326
- ev , err := this .binlogStreamer .GetEvent (context .Background ())
327
- if err != nil {
328
- return err
329
- }
330
- func () {
331
- this .currentCoordinatesMutex .Lock ()
332
- defer this .currentCoordinatesMutex .Unlock ()
333
- this .currentCoordinates .LogPos = int64 (ev .Header .LogPos )
334
- this .currentCoordinates .EventSize = int64 (ev .Header .EventSize )
335
- }()
336
-
337
- switch binlogEvent := ev .Event .(type ) {
338
- case * replication.RotateEvent :
339
- func () {
340
- this .currentCoordinatesMutex .Lock ()
341
- defer this .currentCoordinatesMutex .Unlock ()
342
- this .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
343
- }()
344
- this .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , this .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
345
- case * replication.RowsEvent :
346
- if err := this .handleRowsEvent (ev , binlogEvent , entriesChannel ); err != nil {
347
- return err
348
- }
349
- }
94
+ eventChannel <- ev
350
95
}
351
- this .migrationContext .Log .Debugf ("done streaming events" )
352
-
353
- return nil
354
96
}
355
97
356
98
func (this * GoMySQLReader ) Close () error {
0 commit comments