Skip to content

redo: improve redo writer#4353

Open
wk989898 wants to merge 7 commits intopingcap:masterfrom
wk989898:redo-0304
Open

redo: improve redo writer#4353
wk989898 wants to merge 7 commits intopingcap:masterfrom
wk989898:redo-0304

Conversation

@wk989898
Copy link
Collaborator

@wk989898 wk989898 commented Mar 4, 2026

What problem does this PR solve?

Issue Number: ref #1061

What is changed and how it works?

  • Batching of Events: Implemented batching for MQRowEvent's in Kafka and Pulsar sinks, and RedoRowEvent's in the redo sink, improving efficiency by pushing multiple events at once.
  • Asynchronous Encoding Pipeline: Introduced a dedicated 'encodingWorkerGroup' to parallelize the encoding of redo events, separating this task from the file writing process.
  • Refactored Redo Sink Message Sending: The redo sink's 'sendMessages' function was updated to fetch and write events in batches, leveraging the new encoding pipeline.
  • Optimized Buffer Management: Added buffer clearing ('buffer = buffer[:0]') in batch processing functions to facilitate buffer reuse and reduce allocations.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Performance

    • Improved throughput by batching row events, flushing buffers between batches, and writing events in groups.
  • New Features

    • Added an in-memory encoding worker group to parallelize event encoding and enable concurrent encode+write paths.
    • Introduced mock implementations to aid testing.
  • Bug Fixes

    • Ensure buffers are reset between batches and send loops respect cancellation to avoid stale or stuck batches.
  • Tests

    • Added tests verifying batched send behavior, batch-sizing, and encoding-worker initialization.
  • Refactor

    • Simplified redo write flow to operate on polymorphic event payloads and report batch-aware metrics.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 4, 2026
@wk989898 wk989898 mentioned this pull request Mar 4, 2026
7 tasks
@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the performance and efficiency of the redo log writer by introducing an asynchronous, multi-threaded encoding pipeline. It decouples the CPU-intensive event encoding from the I/O-bound file writing, allowing these operations to run in parallel. Additionally, it incorporates batch processing improvements across Kafka, Pulsar, and redo sinks, leading to more efficient event handling and reduced overhead.

Highlights

  • Batching of Events: Implemented batching for MQRowEvent's in Kafka and Pulsar sinks, and RedoRowEvent's in the redo sink, improving efficiency by pushing multiple events at once.
  • Asynchronous Encoding Pipeline: Introduced a dedicated 'encodingWorkerGroup' to parallelize the encoding of redo events, separating this task from the file writing process.
  • Refactored Redo Sink Message Sending: The redo sink's 'sendMessages' function was updated to fetch and write events in batches, leveraging the new encoding pipeline.
  • Optimized Buffer Management: Added buffer clearing ('buffer = buffer[:0]') in batch processing functions to facilitate buffer reuse and reduce allocations.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/sink/kafka/sink.go
    • Updated to collect MQRowEvent's into a slice and push them in a single batch, and added buffer clearing.
  • downstreamadapter/sink/pulsar/sink.go
    • Updated to collect MQRowEvent's into a slice and push them in a single batch, and added buffer clearing.
  • downstreamadapter/sink/redo/sink.go
    • Modified AddDMLEvent to append RedoRowEvent's to a slice and push them in a batch, and refactored sendMessages to retrieve and write multiple events at once.
  • downstreamadapter/sink/redo/sink_test.go
    • Added TestRedoSinkSendMessagesInBatch to verify the new batching logic in the redo sink.
  • pkg/redo/writer/memory/encoding_worker.go
    • Added a new file defining encodingWorkerGroup for parallel event encoding, managing input/output channels and worker execution.
  • pkg/redo/writer/memory/encoding_worker_test.go
    • Added unit tests for the encodingWorkerGroup's initialization and worker count.
  • pkg/redo/writer/memory/file_worker.go
    • Refactored to consume pre-encoded polymorphicRedoEvent's, removing its internal encoding responsibilities and simplifying syncWrite and writeToCache.
  • pkg/redo/writer/memory/mem_log_writer.go
    • Integrated the encodingWorkerGroup into the memoryLogWriter, enabling asynchronous encoding and coordinating its execution with the file writing workers.
Activity
  • No specific activity has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Batch per-row events into preallocated slices for Kafka, Pulsar, and Redo sinks; reset batch buffers after consumption; add an encoding worker group that produces framed polymorphic redo events and pipe encoding output into memory file writers for concurrent encoding and file-writing.

Changes

Cohort / File(s) Summary
Sink batching
downstreamadapter/sink/kafka/sink.go, downstreamadapter/sink/pulsar/sink.go, downstreamadapter/sink/redo/sink.go
Accumulate per-event row objects into preallocated slices and push each event's slice with a single Push(...events) call; convert row counts to int for allocation and pass uint64 where required; reset batch buffers (buffer = buffer[:0]) after consumption; redo sink send loop now supports context cancellation and batch writes.
Redo sink tests
downstreamadapter/sink/redo/sink_test.go
Adds TestRedoSinkSendMessagesInBatch using a mock redo writer and channel-based log buffer to validate batched send behavior across multiple batches.
Encoding worker (new)
pkg/redo/writer/memory/encoding_worker.go, pkg/redo/writer/memory/encoding_worker_test.go
New polymorphicRedoEvent and encodingWorkerGroup to marshal/ frame redo logs into byte payloads with configurable workers and per-worker input channels; tests verify worker count and channel initialization.
Memory writer refactor
pkg/redo/writer/memory/file_worker.go, pkg/redo/writer/memory/mem_log_writer.go
Refactors file worker and mem_log_writer to consume *polymorphicRedoEvent, removes in-file encoding, wires encoding output into file-writing via channels, and runs encoding + file-writing concurrently (errgroup). Adjusts write paths to accept polymorphic events.
Mocks & scripts
pkg/redo/writer/writer_mock.go, scripts/generate-mock.sh
Adds GoMock-generated mocks for redo writer interfaces and a script step to generate them.
Manifest
go.mod
Module manifest updated (indirect changes).

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Client/Producer
    participant EncWg as EncodingWorkerGroup
    participant FileWg as FileWorkerGroup
    participant Storage as FileCache/Disk

    Producer->>EncWg: AddEvent(redoEvent)
    EncWg->>EncWg: toPolymorphicRedoEvent (marshal, frame)
    EncWg->>FileWg: Send *polymorphicRedoEvent
    FileWg->>Storage: Write(event.data, commitTs)
    Storage->>Storage: Buffer & flush to disk
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

lgtm, size/XXL

Suggested reviewers

  • wlwilliamx
  • hongyunyan
  • tenfyzhong
  • asddongmen

Poem

🐰 I stacked my rows in tidy rows,
Batched them up where soft wind blows,
Workers hum and frames take flight,
Files grow fat through day and night,
Hopping off, I dream in bytes.

🚥 Pre-merge checks | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description covers main changes and includes a filled 'What is changed' section, but critically lacks Issue Number in the required format and has empty responses to key template questions. Add properly formatted 'Issue Number:' line using 'close' or 'ref', and complete answers for performance/compatibility and documentation update questions.
Docstring Coverage ⚠️ Warning Docstring coverage is 44.44% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'redo: improve redo writer' is vague and generic, using non-descriptive terms like 'improve' that don't convey the specific nature of the changes. Consider a more specific title that highlights the primary changes, such as 'redo: batch events in encoding pipeline' or 'redo: add asynchronous encoding worker group'.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@wk989898
Copy link
Collaborator Author

wk989898 commented Mar 4, 2026

/test all

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request refactors the redo log writer by separating event encoding into a dedicated encodingWorkerGroup to improve modularity and enable parallel processing. It also updates Kafka and Pulsar sinks to use batching for better throughput. However, a critical reliability issue was identified in the fileWorkerGroup where unchecked nil values from newFileCache can cause process panics, requiring proper error handling at all call sites of newFileCache.

Comment on lines +266 to +267
file := f.newFileCache(data, event.commitTs)
if err := f.syncWriteFile(egCtx, file); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The newFileCache function can return nil if an error occurs during the initial write to the buffer (e.g., if LZ4 compression fails). The current implementation does not check for this nil return value before passing it to syncWriteFile, which will result in a nil pointer dereference and a process panic when accessing file.maxCommitTs or other fields. This can be used to cause a Denial of Service (DoS) by crashing the TiCDC process.

file := f.newFileCache(data, event.commitTs)
	if file == nil {
		return errors.ErrUnexpected.FastGenByArgs("failed to create file cache")
	}
	if err := f.syncWriteFile(egCtx, file); err != nil {

Comment on lines +351 to 352
file := f.newFileCache(data, commitTs)
f.files = append(f.files, file)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The newFileCache function can return nil if an error occurs. Appending a nil value to f.files will cause a panic in subsequent calls to writeToCache when it attempts to access the last element of the slice (e.g., file.fileSize). This leads to a process crash and Denial of Service.

file := f.newFileCache(data, commitTs)
		if file == nil {
			return errors.ErrUnexpected.FastGenByArgs("failed to create file cache")
		}
		f.files = append(f.files, file)

Comment on lines +363 to 364
file := f.newFileCache(data, commitTs)
f.files = append(f.files, file)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Similar to the previous finding, newFileCache is called and its result is appended to f.files without a nil check. This will cause a panic in later operations that assume all elements in f.files are non-nil.

file := f.newFileCache(data, commitTs)
		if file == nil {
			return errors.ErrUnexpected.FastGenByArgs("failed to create file cache")
		}
		f.files = append(f.files, file)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 234-237: The capacity passed to make() must be an int; change the
rowsCount declaration from uint64 to an int by using int(event.Len()) (or
introduce an int variable rowsCountInt := int(event.Len())) and then use that
int for events := make([]*commonEvent.MQRowEvent, 0, rowsCountInt); if you still
need a uint64 version for toRowCallback or elsewhere, derive it with
uint64(rowsCountInt) when calling toRowCallback or other functions (update
references to rowsCount accordingly). Ensure the symbols involved are rowsCount
(replace type), event.Len(), toRowCallback, and events := make(...).

In `@pkg/redo/writer/memory/file_worker.go`:
- Around line 213-214: The bgWriteLogs function (signature with egCtx
context.Context, inputCh <-chan *polymorphicRedoEvent) treats a closed inputCh
as an unexpected error; change the receive logic to detect a closed channel (use
the comma-ok form when reading from inputCh) and treat that as a normal shutdown
path by returning context.Canceled (or nil if the surrounding contract expects
no error) instead of producing an error; ensure any loop or single-receive code
that currently assumes a value exists is updated to check ok and cleanly exit
using egCtx for cancellation propagation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b8ad85f1-e740-4119-9439-dc7da2e0824b

📥 Commits

Reviewing files that changed from the base of the PR and between 039417c and 486f86a.

📒 Files selected for processing (8)
  • downstreamadapter/sink/kafka/sink.go
  • downstreamadapter/sink/pulsar/sink.go
  • downstreamadapter/sink/redo/sink.go
  • downstreamadapter/sink/redo/sink_test.go
  • pkg/redo/writer/memory/encoding_worker.go
  • pkg/redo/writer/memory/encoding_worker_test.go
  • pkg/redo/writer/memory/file_worker.go
  • pkg/redo/writer/memory/mem_log_writer.go

Comment on lines +213 to 214
egCtx context.Context, inputCh <-chan *polymorphicRedoEvent,
) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Handle closed inputCh as a normal shutdown path.

bgWriteLogs now consumes an upstream channel that may be intentionally closed (from the encoding worker group). The current single-value receive path turns this into an unexpected error, which can surface on normal cancellation instead of returning context.Canceled.

🔧 Proposed fix
-		case event := <-inputCh:
-			if event == nil {
-				log.Error("inputCh of redo file worker is closed unexpectedly")
-				return errors.ErrUnexpected.FastGenByArgs("inputCh of redo file worker is closed unexpectedly")
-			}
+		case event, ok := <-inputCh:
+			if !ok {
+				if err := egCtx.Err(); err != nil {
+					return errors.Trace(err)
+				}
+				log.Info("redo file worker input channel closed",
+					zap.String("keyspace", f.cfg.ChangeFeedID.Keyspace()),
+					zap.String("changefeed", f.cfg.ChangeFeedID.Name()))
+				return nil
+			}
+			if event == nil {
+				log.Warn("received nil redo event in file worker",
+					zap.String("keyspace", f.cfg.ChangeFeedID.Keyspace()),
+					zap.String("changefeed", f.cfg.ChangeFeedID.Name()))
+				continue
+			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/file_worker.go` around lines 213 - 214, The
bgWriteLogs function (signature with egCtx context.Context, inputCh <-chan
*polymorphicRedoEvent) treats a closed inputCh as an unexpected error; change
the receive logic to detect a closed channel (use the comma-ok form when reading
from inputCh) and treat that as a normal shutdown path by returning
context.Canceled (or nil if the surrounding contract expects no error) instead
of producing an error; ensure any loop or single-receive code that currently
assumes a value exists is updated to check ok and cleanly exit using egCtx for
cancellation propagation.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@wk989898
Copy link
Collaborator Author

wk989898 commented Mar 4, 2026

/test all

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@wk989898
Copy link
Collaborator Author

wk989898 commented Mar 5, 2026

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
pkg/redo/writer/memory/encoding_worker.go (1)

104-117: Use Warn only on error shutdowns; use Info for normal stops.

Line 106 currently logs a warning even when err == nil, which can create noisy warning signals during expected lifecycle shutdown.

Proposed refactor
 	defer func() {
-		log.Warn("redo encoding workers closed",
-			zap.String("keyspace", e.changefeed.Keyspace()),
-			zap.String("changefeed", e.changefeed.Name()),
-			zap.Error(err))
+		if err != nil {
+			log.Warn("redo encoding workers closed",
+				zap.String("keyspace", e.changefeed.Keyspace()),
+				zap.String("changefeed", e.changefeed.Name()),
+				zap.Error(err))
+		} else {
+			log.Info("redo encoding workers closed",
+				zap.String("keyspace", e.changefeed.Keyspace()),
+				zap.String("changefeed", e.changefeed.Name()))
+		}
 		if err != nil {
 			select {
 			case e.closed <- err:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/encoding_worker.go` around lines 104 - 117, The Run
method in encodingWorkerGroup logs a warning unconditionally on shutdown; change
the deferred log in encodingWorkerGroup.Run to log at Info level when err == nil
and at Warn (or keep Warn) only when err != nil — i.e., inspect the named return
error variable err inside the defer and call log.Info(...) with the same fields
for normal shutdown, and log.Warn(...) (including zap.Error(err)) when err is
non-nil so expected stops aren’t noisy.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 58-60: The marshal error from codec.MarshalRedoLog in
encoding_worker.go should be wrapped with the standardized ErrMarshalFailed
before returning; replace the current direct return of err after the call to
codec.MarshalRedoLog(rl, nil) with returning
errors.WrapError(errors.ErrMarshalFailed, err) (use the same pattern as in
pkg/redo/writer/file/file.go) so callers can consistently classify marshal
failures.
- Around line 163-173: The select in input() and output() allows sends to
e.inputChs[idx] or outputCh to succeed concurrently with reading from e.closed,
causing events to be accepted after shutdown; add an atomic boolean (e.g.,
e.stopped or e.shuttingDown using sync/atomic) that's set when closing e.closed,
and check that flag before attempting any send in both input() and output(): if
the flag indicates shutdown, return the appropriate errors.ErrRedoWriterStopped
(or wrap) instead of sending; only perform the channel send when the atomic
check shows not-shutdown to ensure sends cannot succeed once shutdown begins.

---

Nitpick comments:
In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 104-117: The Run method in encodingWorkerGroup logs a warning
unconditionally on shutdown; change the deferred log in encodingWorkerGroup.Run
to log at Info level when err == nil and at Warn (or keep Warn) only when err !=
nil — i.e., inspect the named return error variable err inside the defer and
call log.Info(...) with the same fields for normal shutdown, and log.Warn(...)
(including zap.Error(err)) when err is non-nil so expected stops aren’t noisy.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 677165b2-1907-4076-9e9a-7fa46447d256

📥 Commits

Reviewing files that changed from the base of the PR and between 31592de and e29004d.

📒 Files selected for processing (2)
  • pkg/redo/writer/memory/encoding_worker.go
  • pkg/redo/writer/memory/mem_log_writer.go

Comment on lines +163 to +173
select {
case <-ctx.Done():
return ctx.Err()
case err := <-e.closed:
if err != nil {
return errors.WrapError(errors.ErrRedoWriterStopped, err)
}
return errors.ErrRedoWriterStopped
case e.inputChs[idx] <- event:
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's see the file structure and get a sense of the file size
wc -l pkg/redo/writer/memory/encoding_worker.go

Repository: pingcap/ticdc

Length of output: 102


🏁 Script executed:

# Now let's examine the specific lines mentioned: 163-173 and 182-188
cat -n pkg/redo/writer/memory/encoding_worker.go | sed -n '160,195p'

Repository: pingcap/ticdc

Length of output: 958


🏁 Script executed:

# Let's also find where e.closed is closed to understand the shutdown flow
rg -n 'close\(' pkg/redo/writer/memory/encoding_worker.go | head -20

Repository: pingcap/ticdc

Length of output: 78


🏁 Script executed:

# Let's see the full context of the encoding_worker.go struct and initialization
cat -n pkg/redo/writer/memory/encoding_worker.go | head -100

Repository: pingcap/ticdc

Length of output: 3595


🏁 Script executed:

# Find the shutdown logic around line 116
cat -n pkg/redo/writer/memory/encoding_worker.go | sed -n '100,130p'

Repository: pingcap/ticdc

Length of output: 1015


🏁 Script executed:

# Look for Stop, Close, or Shutdown methods
rg -n 'func \(e \*encodingWorkerGroup\) (Stop|Close|Shutdown)' pkg/redo/writer/memory/encoding_worker.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Get the full context of the Stop/Close method and surrounding code
cat -n pkg/redo/writer/memory/encoding_worker.go | sed -n '100,150p'

Repository: pingcap/ticdc

Length of output: 1833


🏁 Script executed:

# Look for where inputChs or outputCh are closed
rg -n 'close\(e\.(inputChs|outputCh)' pkg/redo/writer/memory/encoding_worker.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the runWorker method more completely to see the full logic
cat -n pkg/redo/writer/memory/encoding_worker.go | sed -n '137,160p'

Repository: pingcap/ticdc

Length of output: 885


🏁 Script executed:

# Look for any Cleanup/Close method or other shutdown logic
rg -n 'func \(e \*encodingWorkerGroup\)' pkg/redo/writer/memory/encoding_worker.go

Repository: pingcap/ticdc

Length of output: 392


select statement allows channel sends to succeed after shutdown, risking silent event loss.

In the input() (lines 163-173) and output() (lines 179-189) functions, the <-e.closed and channel send cases coexist in the same select statement. When e.closed is closed (line 116), both branches become simultaneously ready—reading from closed e.closed returns immediately, and sends to inputChs[idx] or outputCh can still succeed if channels have buffer space. This allows callers to return success even after shutdown has begun. Once the worker goroutines exit, events sent after shutdown will accumulate in the channels and be silently discarded.

A safer approach is to guard send paths with an atomic flag or similar mechanism that prevents any sends from succeeding post-shutdown.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/encoding_worker.go` around lines 163 - 173, The select
in input() and output() allows sends to e.inputChs[idx] or outputCh to succeed
concurrently with reading from e.closed, causing events to be accepted after
shutdown; add an atomic boolean (e.g., e.stopped or e.shuttingDown using
sync/atomic) that's set when closing e.closed, and check that flag before
attempting any send in both input() and output(): if the flag indicates
shutdown, return the appropriate errors.ErrRedoWriterStopped (or wrap) instead
of sending; only perform the channel send when the atomic check shows
not-shutdown to ensure sends cannot succeed once shutdown begins.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/redo/writer/memory/encoding_worker.go (1)

163-170: ⚠️ Potential issue | 🔴 Critical

Prevent post-shutdown event acceptance in input/output.

After e.closed is closed in Run, both <-e.closed and the send branch can be ready in the same select. A send can win and return success after shutdown starts, which risks silently dropping accepted events.

#!/bin/bash
set -euo pipefail

# Verify shutdown and send are in the same select blocks.
rg -n 'case err := <-e\.closed|case e\.inputChs\[idx\] <- event|case e\.outputCh <- event' pkg/redo/writer/memory/encoding_worker.go

# Show full context around Run/input/output.
cat -n pkg/redo/writer/memory/encoding_worker.go | sed -n '104,190p'

Also applies to: 176-183

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/encoding_worker.go` around lines 163 - 170, The select
in the input and output methods can race with shutdown because a send to
e.inputChs[idx] or e.outputCh can win even after e.closed is closed; fix by
first non-blocking checking e.closed (select { case <-e.closed: return
errors.ErrRedoWriterStopped.FastGenByArgs(...); default: }) and only then
performing a blocking send that selects between ctx.Done() and the target
channel (but no longer includes <-e.closed in that send-select). Update the
input and output functions to use this two-step pattern and reference e.closed,
e.inputChs (in input), and e.outputCh (in output).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 164-165: Change the bare return of ctx.Err() in the select case
(the "case <-ctx.Done()" branch in encoding_worker.go) to return
errors.Trace(ctx.Err()) so the context error is wrapped and stack traces are
preserved; add the appropriate import for the errors package if missing and
ensure the change compiles (replace `return ctx.Err()` with `return
errors.Trace(ctx.Err())` in that branch).

---

Duplicate comments:
In `@pkg/redo/writer/memory/encoding_worker.go`:
- Around line 163-170: The select in the input and output methods can race with
shutdown because a send to e.inputChs[idx] or e.outputCh can win even after
e.closed is closed; fix by first non-blocking checking e.closed (select { case
<-e.closed: return errors.ErrRedoWriterStopped.FastGenByArgs(...); default: })
and only then performing a blocking send that selects between ctx.Done() and the
target channel (but no longer includes <-e.closed in that send-select). Update
the input and output functions to use this two-step pattern and reference
e.closed, e.inputChs (in input), and e.outputCh (in output).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6c214174-6636-4b9f-8f6b-4846f02f86f7

📥 Commits

Reviewing files that changed from the base of the PR and between e29004d and 71216c4.

📒 Files selected for processing (1)
  • pkg/redo/writer/memory/encoding_worker.go

@wk989898
Copy link
Collaborator Author

wk989898 commented Mar 5, 2026

/retest

Signed-off-by: wk989898 <nhsmwk@gmail.com>
require.ErrorIs(b, eg.Wait(), context.Canceled)
}

type mockBatchWriter struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use gomock to generate

@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Mar 5, 2026
@wk989898
Copy link
Collaborator Author

wk989898 commented Mar 5, 2026

/test all

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 5, 2026

@wk989898: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-pulsar-integration-heavy e43775b link false /test pull-cdc-pulsar-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 6, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pkg/redo/writer/memory/file_worker.go (2)

377-380: ⚠️ Potential issue | 🟡 Minor

Wrap the write error with errors.Trace(err).

file.writer.Write(data) may delegate to the lz4 writer (third-party library). Per coding guidelines, errors from library calls should be wrapped to attach stack traces.

🔧 Proposed fix
 	_, err = file.writer.Write(data)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/file_worker.go` around lines 377 - 380, The error
returned from file.writer.Write(data) should be wrapped with errors.Trace before
returning; in the block that checks "if err != nil { return err }" replace the
bare return with "return errors.Trace(err)" and ensure the errors package used
by the project (e.g., errors.Trace) is imported in file_worker.go. This change
applies to the code around file.writer.Write in the file_worker.go writer
routine.

282-293: ⚠️ Potential issue | 🟡 Minor

Wrap errors from library calls with errors.Trace(err).

Both file.writer.Close() (which may call the lz4 closer) and extStorage.WriteFile are library calls. Per coding guidelines, these errors should be wrapped to attach stack traces.

🔧 Proposed fix
 	if err = file.writer.Close(); err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	if util.GetOrZero(f.cfg.FlushConcurrency) <= 1 {
 		err = f.extStorage.WriteFile(egCtx, file.filename, file.writer.buf.Bytes())
 	} else {
 		err = f.multiPartUpload(egCtx, file)
 	}
 	f.metricFlushAllDuration.Observe(time.Since(start).Seconds())
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/file_worker.go` around lines 282 - 293, The error
returns from library calls in file_worker.go need to be wrapped with
errors.Trace(err) to preserve stack traces: wrap the result of
file.writer.Close() and any error returned from f.extStorage.WriteFile (and, if
applicable, errors from f.multiPartUpload()) with errors.Trace before returning;
update the error handling around the file.writer.Close(), the branch that
assigns err = f.extStorage.WriteFile(egCtx, file.filename,
file.writer.buf.Bytes()), and the multiPartUpload call so any non-nil err is
returned as errors.Trace(err) instead of raw err.
♻️ Duplicate comments (1)
pkg/redo/writer/memory/file_worker.go (1)

241-245: ⚠️ Potential issue | 🟠 Major

Handle closed inputCh as a normal shutdown path.

The channel receive doesn't use the comma-ok idiom to detect channel closure. When the upstream encoding worker group closes inputCh during shutdown, this code treats it as an unexpected error rather than a normal termination signal.

🔧 Proposed fix
-		case event := <-inputCh:
-			if event == nil {
-				log.Error("inputCh of redo file worker is closed unexpectedly")
-				return errors.ErrUnexpected.FastGenByArgs("inputCh of redo file worker is closed unexpectedly")
-			}
+		case event, ok := <-inputCh:
+			if !ok {
+				if err := egCtx.Err(); err != nil {
+					return errors.Trace(err)
+				}
+				log.Info("redo file worker input channel closed",
+					zap.String("keyspace", f.cfg.ChangeFeedID.Keyspace()),
+					zap.String("changefeed", f.cfg.ChangeFeedID.Name()))
+				return nil
+			}
+			if event == nil {
+				log.Warn("received nil redo event in file worker",
+					zap.String("keyspace", f.cfg.ChangeFeedID.Keyspace()),
+					zap.String("changefeed", f.cfg.ChangeFeedID.Name()))
+				continue
+			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/file_worker.go` around lines 241 - 245, The receive
from inputCh in the redo file worker currently treats a closed channel as an
unexpected error; change the receive to use the comma-ok form (e.g., event, ok
:= <-inputCh) and if ok is false treat it as a normal shutdown (return nil or
the function's normal exit) instead of logging/errors via
errors.ErrUnexpected.FastGenByArgs; update the branch that currently logs
"inputCh of redo file worker is closed unexpectedly" to perform a clean shutdown
path so upstream closure is not reported as an error.
🧹 Nitpick comments (3)
downstreamadapter/sink/pulsar/sink.go (1)

448-449: Same ineffective buffer = buffer[:0] as in Kafka sink.

This reassignment doesn't affect the caller's msgsBuf. Consider removing it for clarity, or document why it's intentionally a local reset.

🧹 Suggested cleanup
 		msgs, ok := s.rowChan.GetMultipleNoGroup(buffer)
 		if !ok {
 			log.Info("pulsar sink row event channel closed",
 				zap.String("keyspace", s.changefeedID.Keyspace()),
 				zap.String("changefeed", s.changefeedID.Name()))
 			return nil, nil
 		}
-		buffer = buffer[:0]
 		return msgs, nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/pulsar/sink.go` around lines 448 - 449, The local
assignment buffer = buffer[:0] is ineffective because it only resets the local
slice header and doesn't clear the caller's msgsBuf; remove that line (or
replace it with explicit documentation if the intent was to be a no-op) and
simply return msgs, nil from the function; locate and edit the code around the
buffer, msgs, and msgsBuf usage in sink.go to delete the redundant reset so
behavior and intent are clear.
downstreamadapter/sink/kafka/sink.go (1)

345-346: buffer = buffer[:0] has no effect here.

Since buffer is a function parameter (slice header passed by value), reassigning it doesn't affect the caller's msgsBuf. The caller's slice already has len=0 on each iteration because slice headers are passed by value and msgsBuf was initialized once with len=0.

This line can be safely removed as it's a no-op. The code works correctly, but the statement is misleading about its purpose.

🧹 Suggested cleanup
 		msgs, ok := s.rowChan.GetMultipleNoGroup(buffer)
 		if !ok {
 			log.Info("kafka sink event channel closed",
 				zap.String("keyspace", s.changefeedID.Keyspace()),
 				zap.String("changefeed", s.changefeedID.Name()))
 			return nil, nil
 		}
-		buffer = buffer[:0]
 		return msgs, nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/kafka/sink.go` around lines 345 - 346, The statement
`buffer = buffer[:0]` inside the function (the parameter named `buffer` which is
the slice header passed by value) is a no-op and misleading; remove that line
from the end of the function (the same block that returns `msgs, nil`) so the
function simply returns `msgs, nil`. Ensure you do not attempt to mutate the
caller's `msgsBuf` here — rely on the caller to manage `msgsBuf` length/capacity
as before.
pkg/redo/writer/memory/file_worker.go (1)

317-321: Consider propagating the original error from the write failure.

Currently, when wr.Write(data) fails, the error is logged but the function returns nil, causing callers to generate a generic ErrRedoWriterStopped error. The original error context is lost in the error chain.

This is a suggested improvement for better error diagnostics, not blocking for this PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/memory/file_worker.go` around lines 317 - 321, The write
error is being logged but discarded; when wr.Write(data) fails you should
propagate the original error (or wrap it) instead of returning nil so callers
don't lose context and end up with ErrRedoWriterStopped; modify the code path
that currently does `_, err := wr.Write(data)` to return the original err (or a
wrapped error mentioning the operation) after logging (reference wr.Write(data),
log.Error(...) and ErrRedoWriterStopped) so the real failure travels up the call
chain.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 234-237: The variable rowsCount is inferred as int32 from
event.Len(), but make(..., 0, rowsCount) requires an int; change the assignment
to convert the length to int (e.g., set rowsCount using int(event.Len())) so the
capacity passed to make([]*commonEvent.MQRowEvent, 0, rowsCount) is the correct
type and update any dependent call sites like
toRowCallback(event.PostTxnFlushed, uint64(rowsCount)) as needed.

---

Outside diff comments:
In `@pkg/redo/writer/memory/file_worker.go`:
- Around line 377-380: The error returned from file.writer.Write(data) should be
wrapped with errors.Trace before returning; in the block that checks "if err !=
nil { return err }" replace the bare return with "return errors.Trace(err)" and
ensure the errors package used by the project (e.g., errors.Trace) is imported
in file_worker.go. This change applies to the code around file.writer.Write in
the file_worker.go writer routine.
- Around line 282-293: The error returns from library calls in file_worker.go
need to be wrapped with errors.Trace(err) to preserve stack traces: wrap the
result of file.writer.Close() and any error returned from f.extStorage.WriteFile
(and, if applicable, errors from f.multiPartUpload()) with errors.Trace before
returning; update the error handling around the file.writer.Close(), the branch
that assigns err = f.extStorage.WriteFile(egCtx, file.filename,
file.writer.buf.Bytes()), and the multiPartUpload call so any non-nil err is
returned as errors.Trace(err) instead of raw err.

---

Duplicate comments:
In `@pkg/redo/writer/memory/file_worker.go`:
- Around line 241-245: The receive from inputCh in the redo file worker
currently treats a closed channel as an unexpected error; change the receive to
use the comma-ok form (e.g., event, ok := <-inputCh) and if ok is false treat it
as a normal shutdown (return nil or the function's normal exit) instead of
logging/errors via errors.ErrUnexpected.FastGenByArgs; update the branch that
currently logs "inputCh of redo file worker is closed unexpectedly" to perform a
clean shutdown path so upstream closure is not reported as an error.

---

Nitpick comments:
In `@downstreamadapter/sink/kafka/sink.go`:
- Around line 345-346: The statement `buffer = buffer[:0]` inside the function
(the parameter named `buffer` which is the slice header passed by value) is a
no-op and misleading; remove that line from the end of the function (the same
block that returns `msgs, nil`) so the function simply returns `msgs, nil`.
Ensure you do not attempt to mutate the caller's `msgsBuf` here — rely on the
caller to manage `msgsBuf` length/capacity as before.

In `@downstreamadapter/sink/pulsar/sink.go`:
- Around line 448-449: The local assignment buffer = buffer[:0] is ineffective
because it only resets the local slice header and doesn't clear the caller's
msgsBuf; remove that line (or replace it with explicit documentation if the
intent was to be a no-op) and simply return msgs, nil from the function; locate
and edit the code around the buffer, msgs, and msgsBuf usage in sink.go to
delete the redundant reset so behavior and intent are clear.

In `@pkg/redo/writer/memory/file_worker.go`:
- Around line 317-321: The write error is being logged but discarded; when
wr.Write(data) fails you should propagate the original error (or wrap it)
instead of returning nil so callers don't lose context and end up with
ErrRedoWriterStopped; modify the code path that currently does `_, err :=
wr.Write(data)` to return the original err (or a wrapped error mentioning the
operation) after logging (reference wr.Write(data), log.Error(...) and
ErrRedoWriterStopped) so the real failure travels up the call chain.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 38674758-a1a5-419a-a441-50f195e72c99

📥 Commits

Reviewing files that changed from the base of the PR and between 71216c4 and 8571a24.

📒 Files selected for processing (7)
  • downstreamadapter/sink/kafka/sink.go
  • downstreamadapter/sink/pulsar/sink.go
  • downstreamadapter/sink/redo/sink_test.go
  • pkg/redo/writer/memory/encoding_worker.go
  • pkg/redo/writer/memory/file_worker.go
  • pkg/redo/writer/writer_mock.go
  • scripts/generate-mock.sh
✅ Files skipped from review due to trivial changes (1)
  • pkg/redo/writer/writer_mock.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/redo/writer/memory/encoding_worker.go

Comment on lines +234 to 237
rowsCount := event.Len()
rowCallback := toRowCallback(event.PostTxnFlushed, uint64(rowsCount))
events := make([]*commonEvent.MQRowEvent, 0, rowsCount)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "*.go" | xargs grep -l "type DMLEvent struct" | head -5

Repository: pingcap/ticdc

Length of output: 88


🏁 Script executed:

ast-grep --pattern $'func ($_ *DMLEvent) Len() $_ {
  $$$
}'

Repository: pingcap/ticdc

Length of output: 210


🏁 Script executed:

sed -n '230,240p' downstreamadapter/sink/kafka/sink.go

Repository: pingcap/ticdc

Length of output: 292


Fix compilation error: make() requires int type for capacity, but event.Len() returns int32.

Line 234 assigns rowsCount := event.Len(), making rowsCount type int32. Line 236 then uses rowsCount as the capacity argument to make([]*commonEvent.MQRowEvent, 0, rowsCount), which requires int. Add an explicit conversion:

rowsCount := int(event.Len())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/kafka/sink.go` around lines 234 - 237, The variable
rowsCount is inferred as int32 from event.Len(), but make(..., 0, rowsCount)
requires an int; change the assignment to convert the length to int (e.g., set
rowsCount using int(event.Len())) so the capacity passed to
make([]*commonEvent.MQRowEvent, 0, rowsCount) is the correct type and update any
dependent call sites like toRowCallback(event.PostTxnFlushed, uint64(rowsCount))
as needed.

@ti-chi-bot ti-chi-bot bot added the lgtm label Mar 6, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 6, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand, asddongmen

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [3AceShowHand,asddongmen]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot removed the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Mar 6, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 6, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-03-05 09:03:00.288270883 +0000 UTC m=+435224.866350067: ☑️ agreed by 3AceShowHand.
  • 2026-03-06 07:40:13.219196957 +0000 UTC m=+516657.797276141: ☑️ agreed by asddongmen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants