@@ -23,11 +23,12 @@ const (
23
23
)
24
24
25
25
type GoMySQLReader struct {
26
- connectionConfig * mysql.ConnectionConfig
27
- binlogSyncer * replication.BinlogSyncer
28
- binlogStreamer * replication.BinlogStreamer
29
- tableMap map [uint64 ]string
30
- currentCoordinates mysql.BinlogCoordinates
26
+ connectionConfig * mysql.ConnectionConfig
27
+ binlogSyncer * replication.BinlogSyncer
28
+ binlogStreamer * replication.BinlogStreamer
29
+ tableMap map [uint64 ]string
30
+ currentCoordinates mysql.BinlogCoordinates
31
+ lastHandledCoordinates mysql.BinlogCoordinates
31
32
}
32
33
33
34
func NewGoMySQLReader (connectionConfig * mysql.ConnectionConfig ) (binlogReader * GoMySQLReader , err error ) {
@@ -39,24 +40,91 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
39
40
}
40
41
binlogReader .binlogSyncer = replication .NewBinlogSyncer (serverId , "mysql" )
41
42
42
- // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
43
- err = binlogReader .binlogSyncer .RegisterSlave (connectionConfig .Key .Hostname , uint16 (connectionConfig .Key .Port ), connectionConfig .User , connectionConfig .Password )
44
- if err != nil {
45
- return binlogReader , err
46
- }
47
-
48
43
return binlogReader , err
49
44
}
50
45
51
46
// ConnectBinlogStreamer
52
47
func (this * GoMySQLReader ) ConnectBinlogStreamer (coordinates mysql.BinlogCoordinates ) (err error ) {
48
+ if coordinates .IsEmpty () {
49
+ return log .Errorf ("Emptry coordinates at ConnectBinlogStreamer()" )
50
+ }
51
+ log .Infof ("Registering replica at %+v:%+v" , this .connectionConfig .Key .Hostname , uint16 (this .connectionConfig .Key .Port ))
52
+ if err := this .binlogSyncer .RegisterSlave (this .connectionConfig .Key .Hostname , uint16 (this .connectionConfig .Key .Port ), this .connectionConfig .User , this .connectionConfig .Password ); err != nil {
53
+ return err
54
+ }
55
+
53
56
this .currentCoordinates = coordinates
57
+ log .Infof ("Connecting binlog streamer at %+v" , this .currentCoordinates )
54
58
// Start sync with sepcified binlog file and position
55
- this .binlogStreamer , err = this .binlogSyncer .StartSync (gomysql.Position {coordinates . LogFile , uint32 (coordinates .LogPos )})
59
+ this .binlogStreamer , err = this .binlogSyncer .StartSync (gomysql.Position {this . currentCoordinates . LogFile , uint32 (this . currentCoordinates .LogPos )})
56
60
57
61
return err
58
62
}
59
63
64
+ func (this * GoMySQLReader ) Reconnect () error {
65
+ this .binlogSyncer .Close ()
66
+
67
+ connectCoordinates := & this .lastHandledCoordinates
68
+ if connectCoordinates .IsEmpty () {
69
+ connectCoordinates = & this .currentCoordinates
70
+ }
71
+ if err := this .ConnectBinlogStreamer (* connectCoordinates ); err != nil {
72
+ return err
73
+ }
74
+ return nil
75
+ }
76
+
77
+ func (this * GoMySQLReader ) GetCurrentBinlogCoordinates () * mysql.BinlogCoordinates {
78
+ return & this .currentCoordinates
79
+ }
80
+
81
+ // StreamEvents
82
+ func (this * GoMySQLReader ) handleRowsEvent (ev * replication.BinlogEvent , rowsEvent * replication.RowsEvent , entriesChannel chan <- * BinlogEntry ) error {
83
+ if this .currentCoordinates .SmallerThanOrEquals (& this .lastHandledCoordinates ) && ! this .lastHandledCoordinates .IsEmpty () {
84
+ log .Infof ("Skipping handled query at %+v" , this .currentCoordinates )
85
+ return nil
86
+ }
87
+
88
+ dml := ToEventDML (ev .Header .EventType .String ())
89
+ if dml == NotDML {
90
+ return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
91
+ }
92
+ for i , row := range rowsEvent .Rows {
93
+ if dml == UpdateDML && i % 2 == 1 {
94
+ // An update has two rows (WHERE+SET)
95
+ // We do both at the same time
96
+ continue
97
+ }
98
+ binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
99
+ binlogEntry .DmlEvent = NewBinlogDMLEvent (
100
+ string (rowsEvent .Table .Schema ),
101
+ string (rowsEvent .Table .Table ),
102
+ dml ,
103
+ )
104
+ switch dml {
105
+ case InsertDML :
106
+ {
107
+ binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
108
+ }
109
+ case UpdateDML :
110
+ {
111
+ binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
112
+ binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
113
+ }
114
+ case DeleteDML :
115
+ {
116
+ binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
117
+ }
118
+ }
119
+ // The channel will do the throttling. Whoever is reding from the channel
120
+ // decides whether action is taken sycnhronously (meaning we wait before
121
+ // next iteration) or asynchronously (we keep pushing more events)
122
+ // In reality, reads will be synchronous
123
+ entriesChannel <- binlogEntry
124
+ }
125
+ return nil
126
+ }
127
+
60
128
// StreamEvents
61
129
func (this * GoMySQLReader ) StreamEvents (canStopStreaming func () bool , entriesChannel chan <- * BinlogEntry ) error {
62
130
for {
@@ -77,44 +145,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
77
145
// future I should remove this.
78
146
this .tableMap [tableMapEvent .TableID ] = string (tableMapEvent .Table )
79
147
} else if rowsEvent , ok := ev .Event .(* replication.RowsEvent ); ok {
80
- dml := ToEventDML (ev .Header .EventType .String ())
81
- if dml == NotDML {
82
- return fmt .Errorf ("Unknown DML type: %s" , ev .Header .EventType .String ())
83
- }
84
- for i , row := range rowsEvent .Rows {
85
- if dml == UpdateDML && i % 2 == 1 {
86
- // An update has two rows (WHERE+SET)
87
- // We do both at the same time
88
- continue
89
- }
90
- binlogEntry := NewBinlogEntryAt (this .currentCoordinates )
91
- binlogEntry .DmlEvent = NewBinlogDMLEvent (
92
- string (rowsEvent .Table .Schema ),
93
- string (rowsEvent .Table .Table ),
94
- dml ,
95
- )
96
- switch dml {
97
- case InsertDML :
98
- {
99
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (row )
100
- }
101
- case UpdateDML :
102
- {
103
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
104
- binlogEntry .DmlEvent .NewColumnValues = sql .ToColumnValues (rowsEvent .Rows [i + 1 ])
105
- }
106
- case DeleteDML :
107
- {
108
- binlogEntry .DmlEvent .WhereColumnValues = sql .ToColumnValues (row )
109
- }
110
- }
111
- // The channel will do the throttling. Whoever is reding from the channel
112
- // decides whether action is taken sycnhronously (meaning we wait before
113
- // next iteration) or asynchronously (we keep pushing more events)
114
- // In reality, reads will be synchronous
115
- entriesChannel <- binlogEntry
148
+ if err := this .handleRowsEvent (ev , rowsEvent , entriesChannel ); err != nil {
149
+ return err
116
150
}
117
151
}
152
+ this .lastHandledCoordinates = this .currentCoordinates
118
153
}
119
154
log .Debugf ("done streaming events" )
120
155
0 commit comments