Skip to content

Commit 19562fd

Browse files
wk989898hongyunyan
authored andcommitted
consumer: make the consumer more robust (#3219)
ref #1327
1 parent 1e7caec commit 19562fd

File tree

3 files changed

+45
-25
lines changed

3 files changed

+45
-25
lines changed

cmd/kafka-consumer/writer.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type writer struct {
7979
ddlList []*commonEvent.DDLEvent
8080
ddlWithMaxCommitTs map[int64]uint64
8181

82-
// this should only be used by the canal-json protocol
82+
// this should be used by the canal-json, avro and open protocol
8383
partitionTableAccessor *common.PartitionTableAccessor
8484

8585
eventRouter *eventrouter.EventRouter
@@ -354,7 +354,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
354354
}
355355

356356
w.onDDL(ddl)
357-
358357
// DDL is broadcast to all partitions, but only handle the DDL from partition-0.
359358
if partition != 0 {
360359
return false
@@ -457,19 +456,25 @@ func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool
457456

458457
func (w *writer) onDDL(ddl *commonEvent.DDLEvent) {
459458
switch w.protocol {
460-
case config.ProtocolCanalJSON, config.ProtocolOpen:
459+
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro:
461460
default:
462461
return
463462
}
464-
if ddl.Type != byte(timodel.ActionCreateTable) {
465-
return
466-
}
467-
stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "")
468-
if err != nil {
469-
log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err))
470-
}
471-
if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil {
472-
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
463+
// TODO: support more corner cases
464+
// e.g. create partition table + drop table(rename table) + create normal table: the partitionTableAccessor should drop the table when the table become normal.
465+
switch timodel.ActionType(ddl.Type) {
466+
case timodel.ActionCreateTable:
467+
stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "")
468+
if err != nil {
469+
log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err))
470+
}
471+
if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil {
472+
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
473+
}
474+
case timodel.ActionRenameTable:
475+
if w.partitionTableAccessor.IsPartitionTable(ddl.ExtraSchemaName, ddl.ExtraTableName) {
476+
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
477+
}
473478
}
474479
}
475480

@@ -548,11 +553,11 @@ func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionP
548553
zap.Stringer("eventType", dml.RowTypes[0]),
549554
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
550555
zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
551-
case config.ProtocolCanalJSON, config.ProtocolOpen:
552-
// for partition table, the canal-json and open-protocol message cannot assign physical table id to each dml message,
556+
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro:
557+
// for partition table, the canal-json, avro and open-protocol message cannot assign physical table id to each dml message,
553558
// we cannot distinguish whether it's a real fallback event or not, still append it.
554559
if w.partitionTableAccessor.IsPartitionTable(schema, table) {
555-
log.Warn("DML events fallback, but it's canal-json or open-protocol and the table is a partition table, still append it",
560+
log.Warn("DML events fallback, but it's canal-json, avro or open-protocol and the table is a partition table, still append it",
556561
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
557562
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
558563
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),

cmd/pulsar-consumer/writer.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -396,19 +396,25 @@ func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool
396396

397397
func (w *writer) onDDL(ddl *commonEvent.DDLEvent) {
398398
switch w.protocol {
399-
case config.ProtocolCanalJSON, config.ProtocolOpen:
399+
case config.ProtocolCanalJSON:
400400
default:
401401
return
402402
}
403-
if ddl.Type != byte(timodel.ActionCreateTable) {
404-
return
405-
}
406-
stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "")
407-
if err != nil {
408-
log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err))
409-
}
410-
if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil {
411-
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
403+
// TODO: support more corner cases
404+
// e.g. create partition table + drop table(rename table) + create normal table: the partitionTableAccessor should drop the table when the table become normal.
405+
switch timodel.ActionType(ddl.Type) {
406+
case timodel.ActionCreateTable:
407+
stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "")
408+
if err != nil {
409+
log.Panic("parse ddl query failed", zap.String("query", ddl.Query), zap.Error(err))
410+
}
411+
if v, ok := stmt.(*ast.CreateTableStmt); ok && v.Partition != nil {
412+
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
413+
}
414+
case timodel.ActionRenameTable:
415+
if w.partitionTableAccessor.IsPartitionTable(ddl.ExtraSchemaName, ddl.ExtraTableName) {
416+
w.partitionTableAccessor.Add(ddl.GetSchemaName(), ddl.GetTableName())
417+
}
412418
}
413419
}
414420

pkg/sink/codec/common/table_info_cache.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ func (m *PartitionTableAccessor) Add(schema, table string) {
116116
log.Info("add partition table to the accessor", zap.String("schema", schema), zap.String("table", table))
117117
}
118118

119+
func (m *PartitionTableAccessor) Drop(schema, table string) {
120+
key := newAccessKey(schema, table)
121+
_, ok := m.memo[key]
122+
if ok {
123+
delete(m.memo, key)
124+
log.Info("drop partition table to the accessor", zap.String("schema", schema), zap.String("table", table))
125+
}
126+
}
127+
119128
func (m *PartitionTableAccessor) IsPartitionTable(schema, table string) bool {
120129
key := newAccessKey(schema, table)
121130
_, ok := m.memo[key]

0 commit comments

Comments
 (0)