Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
92ee0e1
dispatcher,event,cloudstorage: add DML two-stage ack
3AceShowHand Feb 24, 2026
01bd19a
dispatcher,event,cloudstorage: fix enqueue callback semantics
3AceShowHand Feb 25, 2026
c113666
2 small changes
3AceShowHand Feb 25, 2026
6a1be2d
fix test
3AceShowHand Feb 25, 2026
2f8b51c
downstreamadapter,event: simplify sink wake callback flow
3AceShowHand Feb 26, 2026
71f7771
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Feb 26, 2026
841178b
some small changes
3AceShowHand Feb 26, 2026
81bbfce
downstreamadapter,dispatcher: refine sink wake callbacks
3AceShowHand Feb 26, 2026
3ded4e0
fix code
3AceShowHand Feb 26, 2026
cdf2a98
fix
3AceShowHand Feb 26, 2026
460ac1e
fix
3AceShowHand Feb 26, 2026
05f0792
cloudstorage: drain affected dispatchers before ddl write
3AceShowHand Feb 27, 2026
0b76095
add pass block event to the sink interface
3AceShowHand Feb 27, 2026
2cef2fe
Add more comment
3AceShowHand Feb 28, 2026
8e51ebb
rename methods
3AceShowHand Mar 2, 2026
c8bb040
fix the code
3AceShowHand Mar 2, 2026
229673c
fix the code
3AceShowHand Mar 2, 2026
442a724
fix a lot of code
3AceShowHand Mar 2, 2026
9993792
fix
3AceShowHand Mar 2, 2026
a619c8f
adjust comments
3AceShowHand Mar 3, 2026
da1038b
update comments
3AceShowHand Mar 3, 2026
7954404
Merge branch 'master' into storage-sink-two-stage-ack
3AceShowHand Mar 3, 2026
d52a1a1
add more logs
3AceShowHand Mar 4, 2026
839d01b
fix code
3AceShowHand Mar 4, 2026
08edc5a
add more log to the consumer
3AceShowHand Mar 4, 2026
d488388
introduce drain phase
3AceShowHand Mar 4, 2026
e83390a
only affect the storage sink
3AceShowHand Mar 4, 2026
517a7a9
add a lot of code
3AceShowHand Mar 4, 2026
c99534e
fix barrier
3AceShowHand Mar 5, 2026
2ed0030
update test
3AceShowHand Mar 5, 2026
cbbaf16
fix
3AceShowHand Mar 5, 2026
7dee7c8
Merge remote-tracking branch 'refs/remotes/origin/storage-sink-two-st…
3AceShowHand Mar 5, 2026
e20b493
fix make fmt
3AceShowHand Mar 5, 2026
b104191
fix
3AceShowHand Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kafka-consumer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (s *recordingSink) IsNormal() bool { return true }
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
}

func (s *recordingSink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
return nil
}

func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
s.ddls = append(s.ddls, ddl.Query)
Expand Down
4 changes: 4 additions & 0 deletions cmd/pulsar-consumer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (s *recordingSink) IsNormal() bool { return true }
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
}

func (s *recordingSink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
return nil
}

func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
s.ddls = append(s.ddls, ddl.Query)
Expand Down
77 changes: 68 additions & 9 deletions cmd/storage-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type consumer struct {
errCh chan error

dmlCount atomic.Int64
readSeq atomic.Uint64
}

func newConsumer(ctx context.Context) (*consumer, error) {
Expand Down Expand Up @@ -352,6 +353,14 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {
if total == 0 {
return nil
}
var (
schema string
table string
)
if events[0].TableInfo != nil {
schema = events[0].TableInfo.GetSchemaName()
table = events[0].TableInfo.GetTableName()
}
var flushed atomic.Int64
done := make(chan struct{})
for _, e := range events {
Expand All @@ -373,6 +382,7 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {
return context.Cause(ctx)
case <-done:
log.Info("flush DML events done",
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Int("total", total), zap.Duration("duration", time.Since(start)))
return nil
case <-ticker.C:
Expand Down Expand Up @@ -506,15 +516,16 @@ func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.
func (c *consumer) handleNewFiles(
ctx context.Context,
dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange,
round uint64,
) error {
if len(dmlFileMap) == 0 {
log.Info("no new dml files found since last round", zap.Uint64("round", round))
return nil
}
keys := make([]cloudstorage.DmlPathKey, 0, len(dmlFileMap))
for k := range dmlFileMap {
keys = append(keys, k)
}
if len(keys) == 0 {
log.Info("no new dml files found since last round")
return nil
}
sort.Slice(keys, func(i, j int) bool {
if keys[i].TableVersion != keys[j].TableVersion {
return keys[i].TableVersion < keys[j].TableVersion
Expand All @@ -531,10 +542,19 @@ func (c *consumer) handleNewFiles(
return keys[i].Table < keys[j].Table
})

for _, key := range keys {
for order, key := range keys {
tableDef := c.mustGetTableDef(key.SchemaPathKey)
tableKey := key.GetKey()
ddlWatermark := c.tableDDLWatermark[tableKey]
log.Info("storage consumer handle file key",
zap.Uint64("round", round),
zap.Int("order", order),
zap.String("schema", key.Schema),
zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion),
zap.Int64("partition", key.PartitionNum),
zap.String("date", key.Date),
zap.Int("rangeCount", len(dmlFileMap[key])))

// if the key is a fake dml path key which is mainly used for
// sorting schema.json file before the dml files, then execute the ddl query.
Expand All @@ -547,6 +567,17 @@ func (c *consumer) handleNewFiles(
continue
}

seq := c.readSeq.Inc()
log.Info("storage consumer read ddl event",
zap.Uint64("seq", seq),
zap.Uint64("round", round),
zap.Int("order", order),
zap.String("schema", key.Schema),
zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion),
zap.Uint64("ddlWatermark", ddlWatermark),
zap.String("query", tableDef.Query))

ddlEvent, err := tableDef.ToDDLEvent()
if err != nil {
return err
Expand Down Expand Up @@ -578,10 +609,26 @@ func (c *consumer) handleNewFiles(
fileRange := dmlFileMap[key]
for indexKey, indexRange := range fileRange {
for i := indexRange.start; i <= indexRange.end; i++ {
if err := c.appendDMLEvents(ctx, tableID, tableDef, key, &cloudstorage.FileIndex{
fileIndex := &cloudstorage.FileIndex{
FileIndexKey: indexKey,
Idx: i,
}); err != nil {
}
filePath := key.GenerateDMLFilePath(fileIndex, c.fileExtension, fileIndexWidth)
seq := c.readSeq.Inc()
log.Info("storage consumer read dml file",
zap.Uint64("seq", seq),
zap.Uint64("round", round),
zap.Int("order", order),
zap.String("schema", key.Schema),
zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion),
zap.Int64("partition", key.PartitionNum),
zap.String("date", key.Date),
zap.String("dispatcher", indexKey.DispatcherID),
zap.Bool("enableTableAcrossNodes", indexKey.EnableTableAcrossNodes),
zap.Uint64("fileIndex", i),
zap.String("path", filePath))
if err := c.appendDMLEvents(ctx, tableID, tableDef, key, fileIndex); err != nil {
return err
}
}
Expand All @@ -595,7 +642,15 @@ func (c *consumer) handleNewFiles(
func (c *consumer) handle(ctx context.Context) error {
ticker := time.NewTicker(flushInterval)
logTicker := time.NewTicker(defaultLogInterval)
lastDMLCount := int64(0)
defer func() {
ticker.Stop()
logTicker.Stop()
}()

var (
lastDMLCount int64
round uint64
)
for {
select {
case <-ctx.Done():
Expand All @@ -612,12 +667,16 @@ func (c *consumer) handle(ctx context.Context) error {
case <-ticker.C:
}

round++
dmlFileMap, err := c.getNewFiles(ctx)
if err != nil {
return errors.Trace(err)
}
log.Info("storage consumer scan done",
zap.Uint64("round", round),
zap.Int("dmlPathKeyCount", len(dmlFileMap)))

err = c.handleNewFiles(ctx, dmlFileMap)
err = c.handleNewFiles(ctx, dmlFileMap, round)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading