Skip to content

Commit f2adacf

Browse files
dbnskilance6716dveeden
authored
Adds OnTableNotFound() error handler to Canal (#1105)
* Adds OnTableNotFound() error handler to Canal * Apply suggestions from code review Co-authored-by: Daniël van Eeden <github@myname.nl> --------- Co-authored-by: lance6716 <lance6716@gmail.com> Co-authored-by: Daniël van Eeden <github@myname.nl>
1 parent d8173d0 commit f2adacf

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

canal/handler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package canal
33
import (
44
"github.com/go-mysql-org/go-mysql/mysql"
55
"github.com/go-mysql-org/go-mysql/replication"
6+
"github.com/go-mysql-org/go-mysql/schema"
67
)
78

89
type EventHandler interface {
@@ -21,6 +22,9 @@ type EventHandler interface {
2122
// You'll get the original executed query, with comments if present.
2223
// It will be called before OnRow.
2324
OnRowsQueryEvent(e *replication.RowsQueryEvent) error
25+
// OnTableNotFound is called when a Rows Event references a table object
26+
// that no longer exists.
27+
OnTableNotFound(*replication.EventHeader, *replication.RowsEvent) error
2428
String() string
2529
}
2630

@@ -51,6 +55,11 @@ func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error
5155
return nil
5256
}
5357

58+
// OnTableNotFound is called for row events for reference tables that are not found
59+
func (h *DummyEventHandler) OnTableNotFound(header *replication.EventHeader, e *replication.RowsEvent) error {
60+
return schema.ErrTableNotExist
61+
}
62+
5463
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
5564

5665
// `SetEventHandler` registers the sync handler, you must register your

canal/sync.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,15 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
270270

271271
t, err := c.GetTable(schemaName, tableName)
272272
if err != nil {
273-
e := errors.Cause(err)
273+
cause := errors.Cause(err)
274274
// ignore errors below
275-
if e == ErrExcludedTable || e == schema.ErrTableNotExist || e == schema.ErrMissingTableMeta {
276-
err = nil
275+
if cause == ErrExcludedTable || cause == schema.ErrMissingTableMeta {
276+
return nil
277+
}
278+
// Allow handler to decide what to do when table is missing.
279+
if cause == schema.ErrTableNotExist {
280+
return c.eventHandler.OnTableNotFound(e.Header, ev)
277281
}
278-
279282
return err
280283
}
281284
var action string

0 commit comments

Comments
 (0)