Skip to content

Commit 29db6eb

Browse files
authored
dispatcher,event,cloudstorage: add DML two-stage ack (#4263)
close #4269
1 parent 82a73ed commit 29db6eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2229
-593
lines changed

cmd/kafka-consumer/writer_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (s *recordingSink) IsNormal() bool { return true }
4444
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
4545
}
4646

47+
func (s *recordingSink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
48+
return nil
49+
}
50+
4751
func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
4852
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
4953
s.ddls = append(s.ddls, ddl.Query)

cmd/pulsar-consumer/writer_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ func (s *recordingSink) IsNormal() bool { return true }
4242
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
4343
}
4444

45+
func (s *recordingSink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
46+
return nil
47+
}
48+
4549
func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
4650
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
4751
s.ddls = append(s.ddls, ddl.Query)

cmd/storage-consumer/consumer.go

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type consumer struct {
7575
errCh chan error
7676

7777
dmlCount atomic.Int64
78+
readSeq atomic.Uint64
7879
}
7980

8081
func newConsumer(ctx context.Context) (*consumer, error) {
@@ -352,6 +353,14 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {
352353
if total == 0 {
353354
return nil
354355
}
356+
var (
357+
schema string
358+
table string
359+
)
360+
if events[0].TableInfo != nil {
361+
schema = events[0].TableInfo.GetSchemaName()
362+
table = events[0].TableInfo.GetTableName()
363+
}
355364
var flushed atomic.Int64
356365
done := make(chan struct{})
357366
for _, e := range events {
@@ -373,6 +382,7 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {
373382
return context.Cause(ctx)
374383
case <-done:
375384
log.Info("flush DML events done",
385+
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
376386
zap.Int("total", total), zap.Duration("duration", time.Since(start)))
377387
return nil
378388
case <-ticker.C:
@@ -506,15 +516,16 @@ func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.
506516
func (c *consumer) handleNewFiles(
507517
ctx context.Context,
508518
dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange,
519+
round uint64,
509520
) error {
521+
if len(dmlFileMap) == 0 {
522+
log.Info("no new dml files found since last round", zap.Uint64("round", round))
523+
return nil
524+
}
510525
keys := make([]cloudstorage.DmlPathKey, 0, len(dmlFileMap))
511526
for k := range dmlFileMap {
512527
keys = append(keys, k)
513528
}
514-
if len(keys) == 0 {
515-
log.Info("no new dml files found since last round")
516-
return nil
517-
}
518529
sort.Slice(keys, func(i, j int) bool {
519530
if keys[i].TableVersion != keys[j].TableVersion {
520531
return keys[i].TableVersion < keys[j].TableVersion
@@ -531,10 +542,19 @@ func (c *consumer) handleNewFiles(
531542
return keys[i].Table < keys[j].Table
532543
})
533544

534-
for _, key := range keys {
545+
for order, key := range keys {
535546
tableDef := c.mustGetTableDef(key.SchemaPathKey)
536547
tableKey := key.GetKey()
537548
ddlWatermark := c.tableDDLWatermark[tableKey]
549+
log.Info("storage consumer handle file key",
550+
zap.Uint64("round", round),
551+
zap.Int("order", order),
552+
zap.String("schema", key.Schema),
553+
zap.String("table", key.Table),
554+
zap.Uint64("tableVersion", key.TableVersion),
555+
zap.Int64("partition", key.PartitionNum),
556+
zap.String("date", key.Date),
557+
zap.Int("rangeCount", len(dmlFileMap[key])))
538558

539559
// if the key is a fake dml path key which is mainly used for
540560
// sorting schema.json file before the dml files, then execute the ddl query.
@@ -547,6 +567,17 @@ func (c *consumer) handleNewFiles(
547567
continue
548568
}
549569

570+
seq := c.readSeq.Inc()
571+
log.Info("storage consumer read ddl event",
572+
zap.Uint64("seq", seq),
573+
zap.Uint64("round", round),
574+
zap.Int("order", order),
575+
zap.String("schema", key.Schema),
576+
zap.String("table", key.Table),
577+
zap.Uint64("tableVersion", key.TableVersion),
578+
zap.Uint64("ddlWatermark", ddlWatermark),
579+
zap.String("query", tableDef.Query))
580+
550581
ddlEvent, err := tableDef.ToDDLEvent()
551582
if err != nil {
552583
return err
@@ -578,10 +609,26 @@ func (c *consumer) handleNewFiles(
578609
fileRange := dmlFileMap[key]
579610
for indexKey, indexRange := range fileRange {
580611
for i := indexRange.start; i <= indexRange.end; i++ {
581-
if err := c.appendDMLEvents(ctx, tableID, tableDef, key, &cloudstorage.FileIndex{
612+
fileIndex := &cloudstorage.FileIndex{
582613
FileIndexKey: indexKey,
583614
Idx: i,
584-
}); err != nil {
615+
}
616+
filePath := key.GenerateDMLFilePath(fileIndex, c.fileExtension, fileIndexWidth)
617+
seq := c.readSeq.Inc()
618+
log.Info("storage consumer read dml file",
619+
zap.Uint64("seq", seq),
620+
zap.Uint64("round", round),
621+
zap.Int("order", order),
622+
zap.String("schema", key.Schema),
623+
zap.String("table", key.Table),
624+
zap.Uint64("tableVersion", key.TableVersion),
625+
zap.Int64("partition", key.PartitionNum),
626+
zap.String("date", key.Date),
627+
zap.String("dispatcher", indexKey.DispatcherID),
628+
zap.Bool("enableTableAcrossNodes", indexKey.EnableTableAcrossNodes),
629+
zap.Uint64("fileIndex", i),
630+
zap.String("path", filePath))
631+
if err := c.appendDMLEvents(ctx, tableID, tableDef, key, fileIndex); err != nil {
585632
return err
586633
}
587634
}
@@ -595,7 +642,15 @@ func (c *consumer) handleNewFiles(
595642
func (c *consumer) handle(ctx context.Context) error {
596643
ticker := time.NewTicker(flushInterval)
597644
logTicker := time.NewTicker(defaultLogInterval)
598-
lastDMLCount := int64(0)
645+
defer func() {
646+
ticker.Stop()
647+
logTicker.Stop()
648+
}()
649+
650+
var (
651+
lastDMLCount int64
652+
round uint64
653+
)
599654
for {
600655
select {
601656
case <-ctx.Done():
@@ -612,12 +667,16 @@ func (c *consumer) handle(ctx context.Context) error {
612667
case <-ticker.C:
613668
}
614669

670+
round++
615671
dmlFileMap, err := c.getNewFiles(ctx)
616672
if err != nil {
617673
return errors.Trace(err)
618674
}
675+
log.Info("storage consumer scan done",
676+
zap.Uint64("round", round),
677+
zap.Int("dmlPathKeyCount", len(dmlFileMap)))
619678

620-
err = c.handleNewFiles(ctx, dmlFileMap)
679+
err = c.handleNewFiles(ctx, dmlFileMap, round)
621680
if err != nil {
622681
return errors.Trace(err)
623682
}

0 commit comments

Comments
 (0)