11package canal
22
33import (
4+ "fmt"
5+ "log/slog"
46 "sync/atomic"
57 "time"
68
@@ -20,15 +22,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
2022 if err != nil {
2123 return nil , errors .Errorf ("start sync replication at binlog %v error %v" , pos , err )
2224 }
23- c .cfg .Logger .Infof ("start sync binlog at binlog file %v" , pos )
25+ c .cfg .Logger .Info ("start sync binlog at binlog file" , slog . Any ( "pos" , pos ) )
2426 return s , nil
2527 } else {
2628 gsetClone := gset .Clone ()
2729 s , err := c .syncer .StartSyncGTID (gset )
2830 if err != nil {
2931 return nil , errors .Errorf ("start sync replication at GTID set %v error %v" , gset , err )
3032 }
31- c .cfg .Logger .Infof ("start sync binlog at GTID set %v" , gsetClone )
33+ c .cfg .Logger .Info ("start sync binlog at GTID set" , slog . Any ( "gset" , gsetClone ) )
3234 return s , nil
3335 }
3436}
@@ -57,7 +59,7 @@ func (c *Canal) runSyncBinlog() error {
5759 // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
5860 if ev .Header .Timestamp == 0 {
5961 fakeRotateLogName := string (e .NextLogName )
60- c .cfg .Logger .Infof ("received fake rotate event, next log name is %s" , e .NextLogName )
62+ c .cfg .Logger .Info ("received fake rotate event" , slog . String ( "nextLogName" , string ( e .NextLogName )) )
6163
6264 if fakeRotateLogName != c .master .Position ().Name {
6365 c .cfg .Logger .Info ("log name changed, the fake rotate event will be handled as a real rotate event" )
@@ -93,7 +95,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
9395 case * replication.RotateEvent :
9496 pos .Name = string (e .NextLogName )
9597 pos .Pos = uint32 (e .Position )
96- c .cfg .Logger .Infof ("rotate binlog to %s" , pos )
98+ c .cfg .Logger .Info ("rotate binlog" , slog . Any ( "pos" , pos ) )
9799 savePos = true
98100 force = true
99101 if err = c .eventHandler .OnRotate (ev .Header , e ); err != nil {
@@ -103,7 +105,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
103105 // we only focus row based event
104106 err = c .handleRowsEvent (ev )
105107 if err != nil {
106- c .cfg .Logger .Errorf ( "handle rows event at (%s, %d) error %v " , pos .Name , curPos , err )
108+ c .cfg .Logger .Error ( fmt . Sprintf ( "handle rows event at (%s, %d)" , pos .Name , curPos ), slog . Any ( "error" , err ) )
107109 return errors .Trace (err )
108110 }
109111 return nil
@@ -113,7 +115,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
113115 for _ , subEvent := range ev .Events {
114116 err = c .handleEvent (subEvent )
115117 if err != nil {
116- c .cfg .Logger .Errorf ( "handle transaction payload subevent at (%s, %d) error %v " , pos .Name , curPos , err )
118+ c .cfg .Logger .Error ( fmt . Sprintf ( "handle transaction payload subevent at (%s, %d)" , pos .Name , curPos ), slog . Any ( "error" , err ) )
117119 return errors .Trace (err )
118120 }
119121 }
@@ -144,7 +146,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
144146 if err != nil {
145147 // The parser does not understand all syntax.
146148 // For example, it won't parse [CREATE|DROP] TRIGGER statements.
147- c .cfg .Logger .Errorf ( "parse query(%s) err %v , will skip this event" , e .Query , err )
149+ c .cfg .Logger .Error ( "error parsing query , will skip this event" , slog . String ( "query" , string ( e .Query )), slog . Any ( "error" , err ) )
148150 return nil
149151 }
150152 if len (stmts ) > 0 {
@@ -246,7 +248,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
246248
247249func (c * Canal ) updateTable (header * replication.EventHeader , db , table string ) (err error ) {
248250 c .ClearTableCache ([]byte (db ), []byte (table ))
249- c .cfg .Logger .Infof ("table structure changed, clear table cache: %s.%s \n " , db , table )
251+ c .cfg .Logger .Info ("table structure changed, clear table cache" , slog . String ( "database" , db ), slog . String ( " table" , table ) )
250252 if err = c .eventHandler .OnTableChanged (header , db , table ); err != nil && errors .Cause (err ) != schema .ErrTableNotExist {
251253 return errors .Trace (err )
252254 }
@@ -316,7 +318,7 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
316318 if curPos .Compare (pos ) >= 0 {
317319 return nil
318320 } else {
319- c .cfg .Logger .Debugf ( "master pos is %v, wait catching %v" , curPos , pos )
321+ c .cfg .Logger .Debug ( fmt . Sprintf ( "master pos is %v, wait catching %v" , curPos , pos ) )
320322 time .Sleep (100 * time .Millisecond )
321323 }
322324 }
0 commit comments