Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {

rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
events := make([]*commonEvent.MQRowEvent, 0, rowsCount)

for {
row, ok := event.GetNextRow()
Expand All @@ -246,7 +247,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
return errors.Trace(err)
}

mqEvent := &commonEvent.MQRowEvent{
events = append(events, &commonEvent.MQRowEvent{
Key: commonEvent.TopicPartitionKey{
Topic: topic,
Partition: index,
Expand All @@ -263,9 +264,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
ColumnSelector: selector,
Checksum: row.Checksum,
},
}
s.rowChan.Push(mqEvent)
})
}
s.rowChan.Push(events...)
}
}
}
Expand Down Expand Up @@ -341,6 +342,7 @@ func (s *sink) batch(ctx context.Context, buffer []*commonEvent.MQRowEvent) ([]*
zap.String("changefeed", s.changefeedID.Name()))
return nil, nil
}
buffer = buffer[:0]
return msgs, nil
}
}
Expand Down
8 changes: 5 additions & 3 deletions downstreamadapter/sink/pulsar/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {

rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
events := make([]*commonEvent.MQRowEvent, 0, rowsCount)

for {
row, ok := event.GetNextRow()
Expand All @@ -356,7 +357,7 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
return errors.Trace(err)
}

mqEvent := &commonEvent.MQRowEvent{
events = append(events, &commonEvent.MQRowEvent{
Key: commonEvent.TopicPartitionKey{
Topic: topic,
Partition: index,
Expand All @@ -373,9 +374,9 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error {
ColumnSelector: selector,
Checksum: row.Checksum,
},
}
s.rowChan.Push(mqEvent)
})
}
s.rowChan.Push(events...)
}
}
}
Expand Down Expand Up @@ -444,6 +445,7 @@ func (s *sink) batch(ctx context.Context, buffer []*commonEvent.MQRowEvent, tick
zap.String("changefeed", s.changefeedID.Name()))
return nil, nil
}
buffer = buffer[:0]
return msgs, nil
}
}
Expand Down
24 changes: 18 additions & 6 deletions downstreamadapter/sink/redo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,17 @@ func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {
}
}
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)
rowsCount := event.Len()
rowCallback := toRowCallback(event.PostTxnFlushed, uint64(rowsCount))
events := make([]writer.RedoEvent, 0, rowsCount)

for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
s.logBuffer.Push(&commonEvent.RedoRowEvent{
events = append(events, &commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
Expand All @@ -157,6 +158,7 @@ func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {
Callback: rowCallback,
})
}
s.logBuffer.Push(events...)
}

func (s *Sink) IsNormal() bool {
Expand Down Expand Up @@ -201,20 +203,30 @@ func (s *Sink) Close(_ bool) {
}

func (s *Sink) sendMessages(ctx context.Context) error {
buffer := make([]writer.RedoEvent, 0, redo.DefaultFlushBatchSize)
for {
e, ok := s.logBuffer.Get()
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}
events, ok := s.logBuffer.GetMultipleNoGroup(buffer)
if !ok {
return nil
}
if len(events) == 0 {
continue
}
buffer = events[:0]

start := time.Now()
err := s.dmlWriter.WriteEvents(ctx, e)
err := s.dmlWriter.WriteEvents(ctx, events...)
if err != nil {
return err
}

if s.metric != nil {
s.metric.observeRowWrite(1, time.Since(start))
s.metric.observeRowWrite(len(events), time.Since(start))
}
}
}
Expand Down
61 changes: 61 additions & 0 deletions downstreamadapter/sink/redo/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/pingcap/ticdc/pkg/redo/writer"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/chann"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -329,3 +331,62 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {

require.ErrorIs(b, eg.Wait(), context.Canceled)
}

type mockBatchWriter struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use gomock to generate

mu sync.Mutex
batchLens []int
}

func (m *mockBatchWriter) WriteEvents(_ context.Context, events ...writer.RedoEvent) error {
m.mu.Lock()
defer m.mu.Unlock()
m.batchLens = append(m.batchLens, len(events))
for _, event := range events {
event.PostFlush()
}
return nil
}

func (m *mockBatchWriter) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

func (m *mockBatchWriter) Close() error {
return nil
}

func (m *mockBatchWriter) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) {}

func TestRedoSinkSendMessagesInBatch(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockWriter := &mockBatchWriter{}
s := &Sink{
dmlWriter: mockWriter,
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
}

doneCh := make(chan error, 1)
go func() {
doneCh <- s.sendMessages(ctx)
}()

totalEvents := redo.DefaultFlushBatchSize*2 + 17
events := make([]writer.RedoEvent, 0, totalEvents)
for i := 0; i < totalEvents; i++ {
events = append(events, &commonEvent.RedoRowEvent{})
}
s.logBuffer.Push(events...)
s.logBuffer.Close()

err := <-doneCh
require.NoError(t, err)

mockWriter.mu.Lock()
defer mockWriter.mu.Unlock()
require.Equal(t, []int{redo.DefaultFlushBatchSize, redo.DefaultFlushBatchSize, 17}, mockWriter.batchLens)
}
184 changes: 184 additions & 0 deletions pkg/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"context"
"encoding/binary"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/pingcap/ticdc/pkg/redo/codec"
"github.com/pingcap/ticdc/pkg/redo/writer"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// polymorphicRedoEvent wraps RedoLog and callback for file worker.
type polymorphicRedoEvent struct {
commitTs common.Ts
data []byte
callback func()
}

func (e *polymorphicRedoEvent) PostFlush() {
if e.callback != nil {
e.callback()
}
}

func toPolymorphicRedoEvent(
event writer.RedoEvent,
tableSchemaStore *commonEvent.TableSchemaStore,
) (*polymorphicRedoEvent, error) {
rl := event.ToRedoLog()
if rl == nil {
return nil, errors.ErrUnexpected.FastGenByArgs("redo event to log conversion failed")
}
if rl.Type == commonEvent.RedoLogTypeDDL {
rl.RedoDDL.SetTableSchemaStore(tableSchemaStore)
}

rawData, err := codec.MarshalRedoLog(rl, nil)
if err != nil {
return nil, errors.WrapError(errors.ErrMarshalFailed, err)
}
lenField, padBytes := writer.EncodeFrameSize(len(rawData))
data := make([]byte, 8+len(rawData)+padBytes)
binary.LittleEndian.PutUint64(data[:8], lenField)
copy(data[8:], rawData)
return &polymorphicRedoEvent{
commitTs: rl.GetCommitTs(),
callback: event.PostFlush,
data: data,
}, nil
}

type encodingWorkerGroup struct {
changefeed common.ChangeFeedID

outputCh chan *polymorphicRedoEvent
inputChs []chan writer.RedoEvent
workerNum int

nextWorker atomic.Uint64
closed chan error

tableSchemaStore *commonEvent.TableSchemaStore
}

func newEncodingWorkerGroup(cfg *writer.LogWriterConfig) *encodingWorkerGroup {
workerNum := util.GetOrZero(cfg.EncodingWorkerNum)
if workerNum <= 0 {
workerNum = redo.DefaultEncodingWorkerNum
}
inputChs := make([]chan writer.RedoEvent, workerNum)
for i := 0; i < workerNum; i++ {
inputChs[i] = make(chan writer.RedoEvent, redo.DefaultEncodingInputChanSize)
}
return &encodingWorkerGroup{
changefeed: cfg.ChangeFeedID,
inputChs: inputChs,
outputCh: make(chan *polymorphicRedoEvent, redo.DefaultEncodingOutputChanSize),
workerNum: workerNum,
closed: make(chan error, 1),
}
}

func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
defer func() {
log.Warn("redo encoding workers closed",
zap.String("keyspace", e.changefeed.Keyspace()),
zap.String("changefeed", e.changefeed.Name()),
zap.Error(err))
if err != nil {
select {
case e.closed <- err:
default:
}
}
close(e.closed)
}()
g, egCtx := errgroup.WithContext(ctx)
for i := 0; i < e.workerNum; i++ {
idx := i
g.Go(func() error {
return e.runWorker(egCtx, idx)
})
}
log.Info("redo log encoding workers started",
zap.String("keyspace", e.changefeed.Keyspace()),
zap.String("changefeed", e.changefeed.Name()),
zap.Int("workerNum", e.workerNum))
return g.Wait()
}

func (e *encodingWorkerGroup) AddEvent(ctx context.Context, event writer.RedoEvent) error {
idx := int((e.nextWorker.Inc() - 1) % uint64(e.workerNum))
return e.input(ctx, idx, event)
}

func (e *encodingWorkerGroup) runWorker(egCtx context.Context, idx int) error {
for {
select {
case <-egCtx.Done():
return errors.Trace(egCtx.Err())
case event := <-e.inputChs[idx]:
if event == nil {
log.Warn("received nil event in redo encoding worker",
zap.String("keyspace", e.changefeed.Keyspace()),
zap.String("changefeed", e.changefeed.Name()))
continue
}
redoLogEvent, err := toPolymorphicRedoEvent(event, e.tableSchemaStore)
if err != nil {
return errors.Trace(err)
}
if err := e.output(egCtx, redoLogEvent); err != nil {
return errors.Trace(err)
}
}
}
}

func (e *encodingWorkerGroup) input(
ctx context.Context, idx int, event writer.RedoEvent,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-e.closed:
return errors.ErrRedoWriterStopped.FastGenByArgs(err)
case e.inputChs[idx] <- event:
return nil
}
}

func (e *encodingWorkerGroup) output(
ctx context.Context, event *polymorphicRedoEvent,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-e.closed:
return errors.ErrRedoWriterStopped.FastGenByArgs(err)
case e.outputCh <- event:
return nil
}
}
Loading