Skip to content

add task indexer#4364

Open
3AceShowHand wants to merge 8 commits intopingcap:masterfrom
3AceShowHand:migrate-encoder-group-remove-defragmenter
Open

add task indexer#4364
3AceShowHand wants to merge 8 commits intopingcap:masterfrom
3AceShowHand:migrate-encoder-group-remove-defragmenter

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Mar 6, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Refactor

    • Reworked cloud storage sink to a task-based, multi-queue encoder/writer pipeline for better parallelism and reliability.
    • Simplified drain/flushing semantics and improved encoder→writer orchestration.
  • New Features

    • Added table version and dispatcher metadata to cloud-storage path formats to improve routing and storage layout.
  • Tests

    • Expanded tests for encoder routing, drain markers, and writer shutdown/exit behavior.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Mar 6, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 6, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign tenfyzhong for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Mar 6, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural improvement to the cloud storage sink by replacing its event processing mechanism. The previous defragmentation logic has been superseded by a new task-based pipeline that leverages task indexing to manage event flow. This new design enhances parallelism in encoding and writing operations while strictly preserving event ordering per dispatcher. Additionally, the handling of file paths and schema versions has been centralized and optimized with a new state management system, contributing to a more robust and maintainable codebase.

Highlights

  • Refactored Cloud Storage Sink Pipeline: The internal event processing pipeline for the cloud storage sink has been completely overhauled, replacing the previous defragmenter and eventFragment model with a new task and taskIndexer system for improved efficiency and clarity.
  • Enhanced Parallelism and Ordering: A multi-stage pipeline has been implemented, featuring dedicated encoder shards and writer shards. This design allows for parallel processing of events while strictly maintaining per-dispatcher ordering, ensuring data consistency.
  • Centralized Path Handling and State Management: File path generation and indexing logic have been refactored and centralized. This includes robust schema version management and a new state cleanup mechanism to efficiently manage file indexing state.
  • Improved Observability with New Metrics: New metrics have been introduced to track flush tasks, their durations by reason, and file sizes. The CloudStorageWorkerBusyRatio metric has been renamed and updated to CloudStorageShardBusySeconds for better clarity and granularity.
  • Codebase Reorganization: Consumer-specific path key definitions have been moved to cmd/storage-consumer, while general path logic resides in pkg/sink/cloudstorage, improving modularity and separation of concerns.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/storage-consumer/consumer.go
    • Refactored event processing logic, removing DDL watermark and extracting path handling functions.
    • Updated type references from cloudstorage.FileIndexKey and cloudstorage.DmlPathKey to local FileIndexKey and DmlPathKey.
  • cmd/storage-consumer/discovery.go
    • Added new file to encapsulate DML file discovery and parsing logic, previously located in consumer.go.
  • cmd/storage-consumer/main.go
    • Added a check to skip version logging during testing processes.
  • cmd/storage-consumer/path_key.go
    • Added new file to define and handle storage path keys (SchemaPathKey, FileIndexKey, FileIndex, DmlPathKey) specific to the consumer.
  • cmd/storage-consumer/path_key_test.go
    • Added unit tests for consumer-specific path key logic.
  • downstreamadapter/sink/cloudstorage/defragmenter.go
    • Removed, as its functionality was replaced by the new task indexing system.
  • downstreamadapter/sink/cloudstorage/defragmenter_test.go
    • Removed, corresponding to the removal of the defragmenter.
  • downstreamadapter/sink/cloudstorage/dml_writers.go
    • Updated to use the new task-based pipeline, removing defragmenter integration and eventFragment types.
    • Refactored Run method to introduce submitTaskToEncoder and dispatchTaskToWriter for managing the new pipeline stages.
    • Added PassBlockEvent method for backward compatibility.
  • downstreamadapter/sink/cloudstorage/encoding_group.go
    • Refactored to support parallel encoding with multiple input and output channels.
    • Introduced taskIndexer for routing tasks and taskFuture for managing asynchronous encoding results.
    • Added Add and ConsumeOutputShard methods for interacting with the encoding group.
  • downstreamadapter/sink/cloudstorage/encoding_group_test.go
    • Added unit tests for the new encoding group and task indexing logic.
  • downstreamadapter/sink/cloudstorage/sink.go
    • Updated overview comments to reflect the new pipeline architecture.
    • Adjusted DML/DDL event handling and checkpoint logic to align with the new task-based pipeline.
    • Removed redundant logging statements.
  • downstreamadapter/sink/cloudstorage/task.go
    • Added new file to define taskKind, task struct, and drainMarker for the new event processing model.
  • downstreamadapter/sink/cloudstorage/task_indexer.go
    • Added new file to manage routing of tasks to encoder and writer shards, ensuring stable ordering per dispatcher.
  • downstreamadapter/sink/cloudstorage/writer.go
    • Updated to process tasks from the new pipeline, refining flush reasons and improving logging.
    • Changed inputCh type to *chann.DrainableChann[*task] and introduced enqueueTask and closeInput methods.
  • downstreamadapter/sink/cloudstorage/writer_test.go
    • Updated unit tests for the writer to reflect task-based processing and new enqueue methods.
  • downstreamadapter/sink/metrics/cloudstorage.go
    • Added new metrics: CloudStorageFlushTaskCounter, CloudStorageFlushDurationByReasonHistogram, and CloudStorageFlushFileSizeHistogram.
    • Renamed CloudStorageWorkerBusyRatio to CloudStorageShardBusySeconds and updated its usage.
  • pkg/sink/cloudstorage/path.go
    • Refactored path generation, adding tablePathStateKey and tablePathState for improved state management of file indexing.
    • Updated FilePathGenerator to use a pathState map and introduced methods for state cleanup and schema version tracking.
    • Modified GenerateDataFilePath and FetchIndexFromFileName to align with the new state management and file naming conventions.
  • pkg/sink/cloudstorage/path_key.go
    • Removed, as its functionality was moved to cmd/storage-consumer/path_key.go.
  • pkg/sink/cloudstorage/path_key_test.go
    • Removed, corresponding to the removal of path_key.go.
  • pkg/sink/cloudstorage/path_test.go
    • Updated unit tests for path generation and file indexing to reflect the new state management and FetchIndexFromFileName logic.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR removes the defragmenter and replaces fragment-based processing with a unified task model, introducing encoderGroup/encoderGroup, indexer, and task-driven pipelines that route tasks through encoders to per-shard writers and update related tests and path/schema structs.

Changes

Cohort / File(s) Summary
Defragmenter Removal
downstreamadapter/sink/cloudstorage/defragmenter.go, downstreamadapter/sink/cloudstorage/defragmenter_test.go
Deleted the defragmenter implementation and its test; sequencing, out-of-order buffering, and drain-marker dispatch removed.
Task Model
downstreamadapter/sink/cloudstorage/task.go, downstreamadapter/sink/cloudstorage/task_test.go
Added internal task type (DML/drain), drainMarker with finish/wait semantics, and tests for drain wait behavior.
Encoder / Encoding
downstreamadapter/sink/cloudstorage/encoder_group.go, downstreamadapter/sink/cloudstorage/encoding_group.go, downstreamadapter/sink/cloudstorage/encoder_group_test.go
New encoderGroup/encodingGroup components implementing two-queue model (per-encoder input shards, per-output shards), futures/taskFuture, Add/ConsumeOutputShard APIs, and tests for routing, encoding, and cancellation.
Indexer & Routing
downstreamadapter/sink/cloudstorage/indexer.go
Introduced indexer for round-robin input routing and stable per-dispatcher output shard hashing.
DML Writers & Orchestration
downstreamadapter/sink/cloudstorage/dml_writers.go, downstreamadapter/sink/cloudstorage/dml_writers_test.go
Replaced eventFragment flow with *task; removed defragmenter references; added encoderGroup orchestration, runCtx management, task submission/dispatch, and exit-on-close test.
Writer Changes
downstreamadapter/sink/cloudstorage/writer.go, downstreamadapter/sink/cloudstorage/writer_test.go
Writer now consumes *task via internal input channel, introduces writerTask/batchedTask, simplifies file-write signatures, and updates tests to use task-based enqueue/close semantics.
Sink Integration & Tests
downstreamadapter/sink/cloudstorage/sink.go, downstreamadapter/sink/cloudstorage/sink_test.go
Sink delegates to unexported dmlWriters.run/add methods; adjusted checkpoint/DDL logging and added FlushDMLBeforeBlock-without-run test.
Path / Schema Model
pkg/sink/cloudstorage/path.go, pkg/sink/cloudstorage/path_key.go, pkg/sink/cloudstorage/table_definition.go, pkg/sink/cloudstorage/path_test.go
Added DispatcherID/TableVersion and other public fields to path/schema structs; changed CheckOrWriteSchema behavior to reuse latest schema file instead of writing a new one; updated tests accordingly.
Misc
pkg/common/event/dml_event.go
Clarified comment for DMLEvent.TableInfoVersion (documentation-only change).

Sequence Diagram(s)

sequenceDiagram
    participant Sink
    participant DMLWriters
    participant EncoderGroup
    participant Writer
    participant Storage

    Sink->>DMLWriters: AddDMLEvent(event)
    DMLWriters->>DMLWriters: newDMLTask(event)
    DMLWriters->>EncoderGroup: Add(task)
    EncoderGroup->>EncoderGroup: route to input shard (round-robin)
    EncoderGroup->>EncoderGroup: runEncoder encodes -> sets encodedMsgs
    EncoderGroup->>EncoderGroup: route to output shard (stable hash)
    DMLWriters->>EncoderGroup: ConsumeOutputShard(shard)
    EncoderGroup-->>DMLWriters: ready taskFuture
    DMLWriters->>Writer: dispatchTaskToWriter(task)
    Writer->>Writer: genAndDispatchTask (batched writerTask)
    Writer->>Storage: writeDataFile / writeIndexFile
    Writer-->>DMLWriters: marker.finish() / task ack
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • wk989898
  • flowbehappy
  • wlwilliamx

Poem

🐰 Hop hop, fragments turned to tidy tasks,
Encoders spin in queues with steady asks,
Round‑robin to shards, each dispatcher found,
Writers write files while futures softly sound,
A rabbit cheers the pipeline's graceful dance.

🚥 Pre-merge checks | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is largely incomplete, containing only the repository template with no implementation details, rationale, issue reference, test information, or answers to required questions. Complete the PR description by: (1) providing a concrete issue reference instead of 'close #xxx', (2) explaining what changed and how it works, (3) selecting and detailing test types used, (4) answering performance/compatibility and documentation questions, (5) providing a meaningful release note or 'None'.
Docstring Coverage ⚠️ Warning Docstring coverage is 12.07% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'add task indexer' is vague and generic, using non-descriptive language that does not convey meaningful information about the changeset. Provide a more specific title that clearly describes the main architectural change, such as 'Refactor cloud storage sink to use task-based processing with encoder groups' or 'Replace defragmenter with task indexer for cloud storage sink pipeline'.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant refactoring of the cloud storage sink's internal data processing pipeline, replacing the defragmenter with a more robust taskIndexer and a task-based queueing model. This is a great improvement for scalability and maintainability. Additionally, some path parsing logic has been moved to the storage-consumer application, which is a good separation of concerns. However, I've identified a few potential regressions related to state management and race conditions that could impact data correctness and checkpointing. Please see my detailed comments.

Note: Security Review did not run due to the size of the PR.

I am having trouble creating individual review comments. Click here to see my feedback.

cmd/storage-consumer/consumer.go (545-605)

high

The logic for tableDDLWatermark has been removed in this refactoring. This watermark was crucial for preventing the processing of DML events against a stale schema. For instance, if a DML file for an older schema version arrives after a DDL for a newer version has already been processed, the old logic would correctly ignore this stale DML. The new implementation lacks this check, which could lead to data corruption or processing errors. This appears to be a regression. Please consider re-introducing a mechanism to track the latest applied DDL version per table and filter out stale DML events.

downstreamadapter/sink/cloudstorage/sink.go (291-293)

high

The check if checkpoint < s.lastCheckpointTs.Load() has been removed. This check prevented writing an older checkpoint timestamp if a newer one has already been written, which could happen if checkpoint messages arrive out of order in checkpointChan. While out-of-order arrival might be unlikely, this safeguard was important for correctness. Could you clarify if out-of-order checkpoints are impossible now? If not, it would be safer to restore this check to prevent checkpoint regression.

pkg/sink/cloudstorage/path.go (351-368)

high

The new implementation of GenerateDataFilePath (and its helper generateDataFileName) no longer checks for the existence of the generated data file path. The previous implementation had a recursive call that would re-generate a path if a file with the same name already existed. This was a safeguard against race conditions, for example, when two different captures might temporarily attempt to write to the same logical path after a failover. The new logic relies on in-memory state that is initialized from the index file, but this could still lead to two processes generating the same filename if they both initialize from the same index file. This could lead to data loss or corruption. Please consider re-introducing a mechanism to handle potential file name collisions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
cmd/storage-consumer/main.go (1)

59-74: ⚠️ Potential issue | 🟠 Major

Keep flag/default initialization active in tests.

Returning here before the StringVar/DurationVar calls leaves flushInterval=0, fileIndexWidth=0, timezone="", and upstreamURI=nil. Same-package tests that call handle() can now panic on time.NewTicker(0), and newConsumer() starts from different defaults than production. Only skip the runtime side effects in tests; keep the flag/default setup.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/storage-consumer/main.go` around lines 59 - 74, The init() function
currently returns early when isTestingProcess() is true, skipping flag/default
initialization and leaving flushInterval, fileIndexWidth, timezone, and
upstreamURIStr unset; modify init() so it still runs the
flag.StringVar/DurationVar/IntVar/BoolVar calls (keeping default values) even in
tests, but only skip side-effects (e.g., avoid starting runtime profiling or
other immediate runtime actions) when isTestingProcess() is true; ensure
references to init(), isTestingProcess(), flushInterval, fileIndexWidth,
timezone, upstreamURIStr, newConsumer, and handle are used to locate and verify
that flag setup remains executed while runtime-only operations are conditionally
skipped.
downstreamadapter/sink/cloudstorage/sink.go (1)

291-318: ⚠️ Potential issue | 🟠 Major

Reject stale checkpointTs before overwriting metadata.

The checkpoint handler forwards values straight into AddCheckpointTs, so this method still needs a local monotonicity guard. Without it, a late older value can overwrite a newer checkpoint file and move recovery state backwards.

🩹 Minimal guard
+		lastCheckpointTs := s.lastCheckpointTs.Load()
+		if checkpoint <= lastCheckpointTs {
+			log.Debug("skip stale checkpoint ts",
+				zap.String("keyspace", s.changefeedID.Keyspace()),
+				zap.String("changefeed", s.changefeedID.Name()),
+				zap.Uint64("checkpoint", checkpoint),
+				zap.Uint64("lastCheckpoint", lastCheckpointTs))
+			continue
+		}
+
 		if time.Since(s.lastSendCheckpointTsTime) < 2*time.Second {
 			log.Warn("skip write checkpoint ts to external storage",
 				zap.Any("changefeedID", s.changefeedID),
 				zap.Uint64("checkpoint", checkpoint))
 			continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/sink.go` around lines 291 - 318, Reject
stale checkpoint values before writing metadata by comparing the incoming
checkpoint with the local stored value: load current :=
s.lastCheckpointTs.Load() (or call s.lastCheckpointTs.Load().(uint64) as used)
and if checkpoint <= current then log a warning and skip the write/return
without updating storage or s.lastSendCheckpointTsTime; place this check in the
same method that performs the write (the code block that sets start :=
time.Now(), marshals the message, calls s.storage.WriteFile and then
s.lastCheckpointTs.Store) so late/older values cannot overwrite a newer
checkpoint file.
🧹 Nitpick comments (2)
downstreamadapter/sink/metrics/cloudstorage.go (1)

89-96: Breaking change: metric name and labels modified.

The metric name changed to cloud_storage_shard_busy_seconds_total and now includes an additional "id" label. This will break existing Prometheus dashboards/alerts that query the previous metric name. Ensure operational monitoring configurations are updated accordingly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/metrics/cloudstorage.go` around lines 89 - 96, The
metric CloudStorageShardBusySeconds was renamed to
"cloud_storage_shard_busy_seconds_total" and gained an "id" label, breaking
existing dashboards; either revert the change in CloudStorageShardBusySeconds to
the original metric name and label set, or maintain backward compatibility by
adding a second CounterVec (e.g., OldCloudStorageShardBusySeconds) that uses the
original metric name and labels and update all increments to update both metrics
(CloudStorageShardBusySeconds and OldCloudStorageShardBusySeconds); also add a
brief code comment explaining the deprecation and coordinate updating
operational monitoring configs if you keep the new metric.
cmd/storage-consumer/discovery.go (1)

64-79: This poll loop rescans the full storage root every interval.

Each round deep-copies tableDMLIdxMap and walks SubDir: "", so the cost grows with total historical files, not just new ones. If this consumer is expected to run continuously against large buckets, consider narrowing the walk to active prefixes or persisting a frontier/task index so old objects are not relisted every poll.

Also applies to: 103-104

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/storage-consumer/discovery.go` around lines 64 - 79, getNewFiles
currently deep-copies tableDMLIdxMap into origDMLIdxMap and calls
c.externalStorage.WalkDir with WalkOption{SubDir: ""}, causing a full-bucket
rescan every poll; change the logic to only walk active prefixes (or pass a
persisted frontier/poll cursor) instead of SubDir: "", and avoid copying the
entire tableDMLIdxMap each iteration by using a lightweight snapshot of only
active prefixes or persisted last-seen index metadata; update references in
getNewFiles, the origDMLIdxMap construction, and the call to
c.externalStorage.WalkDir/WalkOption to accept a narrower prefix or cursor so
only new objects are listed on subsequent polls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Around line 171-182: The drain wait currently only listens to doneCh and
d.ctx.Done() so it can hang if the pipeline Run goroutine exits with an error;
modify the drain logic (the block that creates doneCh, pushes newDrainTask via
d.msgCh.Push, and waits) to also observe the run/errgroup context or the stored
run error from Run and to fail pending markers on shutdown: when signaling
shutdown (Run returning an error), close or send that error to waiting drain
tasks (the pending doneCh created for newDrainTask) so they unblock with the
appropriate error instead of waiting forever; update the code paths that handle
pipeline termination (the Run goroutine and writer.flushMessages shutdown
cleanup) to propagate the error to all outstanding drain doneChs.

In `@downstreamadapter/sink/cloudstorage/encoding_group_test.go`:
- Around line 56-61: The test currently loops indefinitely calling
commonType.NewDispatcherID() and indexer.routeOutputIndex(dispatcherB) until
shardB != shardA; replace that infinite loop with a bounded retry (e.g., for i
:= 0; i < maxAttempts; i++) and stop when routeOutputIndex returns a different
shard, then assert/fatal if maxAttempts is reached so the test fails fast; apply
this change to both occurrences that compare shardA and shardB (the block using
dispatcherA/dispatcherB near routeOutputIndex and the similar block at lines
~82-85) and use t.Fatalf or require.FailNow with a clear message mentioning
routeOutputIndex and NewDispatcherID when no distinct shard is found.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go`:
- Around line 93-97: The shutdown currently waits for encoders (g.Wait()) and
then closes eg.outputCh, causing a race where Add/submitTaskToEncoder can still
send to an output and panic; change the shutdown order to first stop
producers/writers by closing the input side (eg.inputCh or otherwise signal Add
to return) and wait for Add/submitTaskToEncoder to finish, then wait for the
encoder errgroup (g.Wait()) and only after that close each channel in
eg.outputCh; update Run/Close logic so Add checks the input-closed signal and
returns instead of sending, ensuring submitTaskToEncoder cannot be in-flight
when you close eg.outputCh.

In `@downstreamadapter/sink/cloudstorage/task_indexer.go`:
- Around line 29-31: The dispatcherToShard map in TaskIndexer (protected by mu)
is an unbounded cache keyed by DispatcherID while routing is already
deterministic via routeOutputIndex; remove the global dispatcherToShard cache or
bound it to prevent unbounded growth (e.g., stop storing every seen DispatcherID
and compute shard via routeOutputIndex on-the-fly in functions that reference
dispatcherToShard, or replace the map with a bounded LRU/size-limit eviction and
update cachedOutputCount() to reflect evictions). Locate references to
dispatcherToShard, cachedOutputCount(), and routeOutputIndex in TaskIndexer and
either remove the caching logic and direct-call routeOutputIndex(dispatcherID)
everywhere, or implement an explicit bounded cache with eviction to cap memory
growth.

In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 131-133: TestWriterDrainMarker and TestWriterRun both call
testWriter which mutates global appcontext via
appcontext.SetService(appcontext.DefaultPDClock, ...), causing a race when both
tests use t.Parallel(); remove t.Parallel() from one or both tests (e.g., delete
t.Parallel() in TestWriterDrainMarker) or refactor testWriter to accept a clock
parameter and avoid calling appcontext.SetService so the clock is injected
without mutating global state (adjust callers of testWriter accordingly).

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 453-457: In handleSingleTableEvent (where tableTask.size is
computed from event.encodedMsgs) include the key/header bytes in the size
calculation: for each msg in event.encodedMsgs add uint64(len(msg.Key)) in
addition to uint64(len(msg.Value)) (guarding for nil/empty keys) so
tableTask.size reflects the actual bytes written by writeDataFile (which writes
msg.Key/header) and prevents undercounting for size-triggered flushes and
CloudStorageFlushFileSizeHistogram.

In `@pkg/sink/cloudstorage/path.go`:
- Around line 356-364: generateDataDirPath currently recreates an evicted schema
state using tbl.TableInfoVersion (via ensurePathState/updateSchemaVersion),
which allows an idle table to revert to a different version directory after
cleanupExpiredPathState; instead preserve the already-resolved schemaVersion
across TTL cleanup or perform a fresh schema lookup before recreating state:
modify FilePathGenerator.generateDataDirPath (and the same logic around lines
494-510) to fetch/retain the resolved schemaVersion outside the path-state cache
eviction and only call updateSchemaVersion when a genuine fresh lookup confirms
a new version, or detect eviction and re-resolve the schema (not just use
tbl.TableInfoVersion) before recreating the state via
ensurePathState/updateSchemaVersion.
- Around line 128-131: The keying type tablePathStateKey currently uses
TableName and DispatcherID which can differ from the actual emitted
directory/filename namespaces (e.g., when EnablePartitionSeparator=false or
EnableTableAcrossNodes=false), causing duplicate filenames and stale index
reads; change the keying to use the actual emitted namespace identifiers (for
example the computed directory namespace and the filename namespace or the full
generated path string) instead of TableName/DispatcherID—update the definition
of tablePathStateKey (and any usages in functions that build/lookup path state,
including the code referenced around the path generation logic and the locations
noted at lines ~467-471) so lookups/updates use the same canonical emitted path
components produced by the path-generation function.

---

Outside diff comments:
In `@cmd/storage-consumer/main.go`:
- Around line 59-74: The init() function currently returns early when
isTestingProcess() is true, skipping flag/default initialization and leaving
flushInterval, fileIndexWidth, timezone, and upstreamURIStr unset; modify init()
so it still runs the flag.StringVar/DurationVar/IntVar/BoolVar calls (keeping
default values) even in tests, but only skip side-effects (e.g., avoid starting
runtime profiling or other immediate runtime actions) when isTestingProcess() is
true; ensure references to init(), isTestingProcess(), flushInterval,
fileIndexWidth, timezone, upstreamURIStr, newConsumer, and handle are used to
locate and verify that flag setup remains executed while runtime-only operations
are conditionally skipped.

In `@downstreamadapter/sink/cloudstorage/sink.go`:
- Around line 291-318: Reject stale checkpoint values before writing metadata by
comparing the incoming checkpoint with the local stored value: load current :=
s.lastCheckpointTs.Load() (or call s.lastCheckpointTs.Load().(uint64) as used)
and if checkpoint <= current then log a warning and skip the write/return
without updating storage or s.lastSendCheckpointTsTime; place this check in the
same method that performs the write (the code block that sets start :=
time.Now(), marshals the message, calls s.storage.WriteFile and then
s.lastCheckpointTs.Store) so late/older values cannot overwrite a newer
checkpoint file.

---

Nitpick comments:
In `@cmd/storage-consumer/discovery.go`:
- Around line 64-79: getNewFiles currently deep-copies tableDMLIdxMap into
origDMLIdxMap and calls c.externalStorage.WalkDir with WalkOption{SubDir: ""},
causing a full-bucket rescan every poll; change the logic to only walk active
prefixes (or pass a persisted frontier/poll cursor) instead of SubDir: "", and
avoid copying the entire tableDMLIdxMap each iteration by using a lightweight
snapshot of only active prefixes or persisted last-seen index metadata; update
references in getNewFiles, the origDMLIdxMap construction, and the call to
c.externalStorage.WalkDir/WalkOption to accept a narrower prefix or cursor so
only new objects are listed on subsequent polls.

In `@downstreamadapter/sink/metrics/cloudstorage.go`:
- Around line 89-96: The metric CloudStorageShardBusySeconds was renamed to
"cloud_storage_shard_busy_seconds_total" and gained an "id" label, breaking
existing dashboards; either revert the change in CloudStorageShardBusySeconds to
the original metric name and label set, or maintain backward compatibility by
adding a second CounterVec (e.g., OldCloudStorageShardBusySeconds) that uses the
original metric name and labels and update all increments to update both metrics
(CloudStorageShardBusySeconds and OldCloudStorageShardBusySeconds); also add a
brief code comment explaining the deprecation and coordinate updating
operational monitoring configs if you keep the new metric.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c43ed937-7f3c-4a07-a948-5f501f21c2ed

📥 Commits

Reviewing files that changed from the base of the PR and between 29db6eb and a37f98b.

📒 Files selected for processing (20)
  • cmd/storage-consumer/consumer.go
  • cmd/storage-consumer/discovery.go
  • cmd/storage-consumer/main.go
  • cmd/storage-consumer/path_key.go
  • cmd/storage-consumer/path_key_test.go
  • downstreamadapter/sink/cloudstorage/defragmenter.go
  • downstreamadapter/sink/cloudstorage/defragmenter_test.go
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • downstreamadapter/sink/cloudstorage/encoding_group.go
  • downstreamadapter/sink/cloudstorage/encoding_group_test.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/cloudstorage/task.go
  • downstreamadapter/sink/cloudstorage/task_indexer.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go
  • downstreamadapter/sink/metrics/cloudstorage.go
  • pkg/sink/cloudstorage/path.go
  • pkg/sink/cloudstorage/path_key.go
  • pkg/sink/cloudstorage/path_key_test.go
  • pkg/sink/cloudstorage/path_test.go
💤 Files with no reviewable changes (4)
  • pkg/sink/cloudstorage/path_key_test.go
  • downstreamadapter/sink/cloudstorage/defragmenter.go
  • pkg/sink/cloudstorage/path_key.go
  • downstreamadapter/sink/cloudstorage/defragmenter_test.go

Comment on lines +56 to +61
shardA := indexer.routeOutputIndex(dispatcherA)
shardB := indexer.routeOutputIndex(dispatcherB)
for shardA == shardB {
dispatcherB = commonType.NewDispatcherID()
shardB = indexer.routeOutputIndex(dispatcherB)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fail fast instead of looping on random dispatcher IDs forever.

Both tests keep generating dispatcher IDs until two hashes differ. If the hash function or ID generator behavior changes, these loops hang the suite rather than failing with a clear assertion. Bound the attempts and assert that distinct shards were found.

Also applies to: 82-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group_test.go` around lines 56 -
61, The test currently loops indefinitely calling commonType.NewDispatcherID()
and indexer.routeOutputIndex(dispatcherB) until shardB != shardA; replace that
infinite loop with a bounded retry (e.g., for i := 0; i < maxAttempts; i++) and
stop when routeOutputIndex returns a different shard, then assert/fatal if
maxAttempts is reached so the test fails fast; apply this change to both
occurrences that compare shardA and shardB (the block using
dispatcherA/dispatcherB near routeOutputIndex and the similar block at lines
~82-85) and use t.Fatalf or require.FailNow with a clear message mentioning
routeOutputIndex and NewDispatcherID when no distinct shard is found.

Comment on lines +93 to +97
err := g.Wait()
for _, outCh := range eg.outputCh {
close(outCh)
}
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -name "encoding_group.go" -type f | head -20

Repository: pingcap/ticdc

Length of output: 112


🏁 Script executed:

cat -n downstreamadapter/sink/cloudstorage/encoding_group.go

Repository: pingcap/ticdc

Length of output: 6271


Synchronize producer shutdown before closing outputCh.

Add still sends into eg.outputCh[outputIndex], but Run closes those channels as soon as the encoder errgroup returns. On cancellation or encoder failure, submitTaskToEncoder can still be inside Add, which turns the second enqueue into a send-on-closed-channel panic; if the first enqueue already succeeded, the task is also stranded in inputCh with no writer-side consumer. Stop Add / close the input side before the output close loop. A cancel-while-buffered regression test should reproduce this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoding_group.go` around lines 93 - 97,
The shutdown currently waits for encoders (g.Wait()) and then closes
eg.outputCh, causing a race where Add/submitTaskToEncoder can still send to an
output and panic; change the shutdown order to first stop producers/writers by
closing the input side (eg.inputCh or otherwise signal Add to return) and wait
for Add/submitTaskToEncoder to finish, then wait for the encoder errgroup
(g.Wait()) and only after that close each channel in eg.outputCh; update
Run/Close logic so Add checks the input-closed signal and returns instead of
sending, ensuring submitTaskToEncoder cannot be in-flight when you close
eg.outputCh.

Comment on lines +131 to +133
func TestWriterDrainMarker(t *testing.T) {
t.Parallel()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the writer_test.go file
fd writer_test.go

Repository: pingcap/ticdc

Length of output: 377


🏁 Script executed:

# Get the file size to see if we can read it entirely
wc -l downstreamadapter/sink/cloudstorage/writer_test.go

Repository: pingcap/ticdc

Length of output: 111


🏁 Script executed:

# Read the specific area around lines 131-133 and the testWriter helper
head -200 downstreamadapter/sink/cloudstorage/writer_test.go | tail -100

Repository: pingcap/ticdc

Length of output: 2644


🏁 Script executed:

# Search for testWriter function definition to understand global state usage
rg "func testWriter" downstreamadapter/sink/cloudstorage/writer_test.go -A 20

Repository: pingcap/ticdc

Length of output: 1021


🏁 Script executed:

# Search for appcontext.DefaultPDClock references
rg "appcontext.DefaultPDClock" downstreamadapter/sink/cloudstorage/writer_test.go -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 397


🏁 Script executed:

# Check for TestWriterRun and verify if it has t.Parallel()
rg "func TestWriterRun" downstreamadapter/sink/cloudstorage/writer_test.go -A 5

Repository: pingcap/ticdc

Length of output: 225


🏁 Script executed:

# Check all test functions in the file to see which ones have t.Parallel()
rg "func Test" downstreamadapter/sink/cloudstorage/writer_test.go | head -20

Repository: pingcap/ticdc

Length of output: 134


🏁 Script executed:

# Check all test functions and their parallel status
rg "func Test\w+" downstreamadapter/sink/cloudstorage/writer_test.go -A 1 | grep -E "(func Test|t.Parallel)"

Repository: pingcap/ticdc

Length of output: 162


Don't run this test in parallel while the helper modifies global appcontext state.

testWriter registers a mock clock via appcontext.SetService(appcontext.DefaultPDClock, ...), which affects shared global state. Both TestWriterRun and TestWriterDrainMarker call this helper with t.Parallel(), creating a race on the same service registration. Depending on test execution order, one test's clock setup can be overwritten by the other, causing order-dependent failures. Either remove t.Parallel() from one or both tests, or refactor to inject the clock without mutating appcontext.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 131 - 133,
TestWriterDrainMarker and TestWriterRun both call testWriter which mutates
global appcontext via appcontext.SetService(appcontext.DefaultPDClock, ...),
causing a race when both tests use t.Parallel(); remove t.Parallel() from one or
both tests (e.g., delete t.Parallel() in TestWriterDrainMarker) or refactor
testWriter to accept a clock parameter and avoid calling appcontext.SetService
so the clock is injected without mutating global state (adjust callers of
testWriter accordingly).

Comment on lines +453 to +457
tableTask := t.batch[table]
for _, msg := range event.encodedMsgs {
v.size += uint64(len(msg.Value))
tableTask.size += uint64(len(msg.Value))
}
v.msgs = append(v.msgs, event.encodedMsgs...)
tableTask.msgs = append(tableTask.msgs, event.encodedMsgs...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Include header/key bytes in singleTableTask.size.

handleSingleTableEvent only adds len(msg.Value), but writeDataFile also writes the first msg.Key into the file. That makes size-triggered flushes and CloudStorageFlushFileSizeHistogram undercount the actual file size for formats that emit a non-empty key/header.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 453 - 457, In
handleSingleTableEvent (where tableTask.size is computed from event.encodedMsgs)
include the key/header bytes in the size calculation: for each msg in
event.encodedMsgs add uint64(len(msg.Key)) in addition to uint64(len(msg.Value))
(guarding for nil/empty keys) so tableTask.size reflects the actual bytes
written by writeDataFile (which writes msg.Key/header) and prevents
undercounting for size-triggered flushes and CloudStorageFlushFileSizeHistogram.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/writer.go (1)

336-341: Consider logging when interval-based flush is skipped due to backpressure.

The default case silently skips the flush when toBeFlushedCh is full. While data is retained and will be flushed on the next opportunity, adding debug-level logging here would improve observability when investigating latency or backpressure issues.

💡 Suggested improvement
 			case d.toBeFlushedCh <- writerTask{batch: batchedTask, reason: flushReasonInterval}:
 				log.Debug("flush task is emitted successfully when flush interval exceeds",
 					zap.Int("tablesLength", len(batchedTask.batch)))
 				batchedTask = newBatchedTask()
-			default:
+			default:
+				log.Debug("flush task skipped due to backpressure",
+					zap.Int("shardID", d.shardID),
+					zap.Int("pendingTables", len(batchedTask.batch)))
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 336 - 341, The
default branch currently drops logging when the send to toBeFlushedCh fails due
to backpressure; update the default case in the select that sends
writerTask{batch: batchedTask, reason: flushReasonInterval} to emit a
debug-level log that includes context (e.g., len(batchedTask.batch) and a note
that the flush was skipped due to a full toBeFlushedCh) so operators can see
interval-based flushes being deferred; keep existing behavior (do not alter
batching or newBatchedTask())—just add the debug log referencing toBeFlushedCh,
writerTask, batchedTask, and flushReasonInterval.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 134-136: The overseerTicker case in flushMessages doesn't check
whether the flusher has been closed, so flushMessages can loop forever if
genAndDispatchTask exits without cancelling the context; modify the ticker
branch in flushMessages (the select case handling <-overseerTicker.C) to check
the same closed condition used for the toBeFlushedCh branch (e.g., call or
inspect d.isClosed / the isClosed flag or a closed bool set when toBeFlushedCh
is closed) and return/exit the function when closed; ensure you reference the
same closure signal used by genAndDispatchTask (toBeFlushedCh / isClosed) so
both the <-d.toBeFlushedCh and <-overseerTicker.C branches terminate
consistently.
- Around line 452-458: Remove the unused method totalSize() defined on the
batchedTask type: locate the function declaration func (t batchedTask)
totalSize() uint64 and delete it from writer.go, ensuring there are no remaining
references to batchedTask.totalSize in the codebase; run a build and tests to
confirm no compile-time usages break after removal.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 336-341: The default branch currently drops logging when the send
to toBeFlushedCh fails due to backpressure; update the default case in the
select that sends writerTask{batch: batchedTask, reason: flushReasonInterval} to
emit a debug-level log that includes context (e.g., len(batchedTask.batch) and a
note that the flush was skipped due to a full toBeFlushedCh) so operators can
see interval-based flushes being deferred; keep existing behavior (do not alter
batching or newBatchedTask())—just add the debug log referencing toBeFlushedCh,
writerTask, batchedTask, and flushReasonInterval.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3693ce99-4457-4504-af07-f21a8bd9f3e2

📥 Commits

Reviewing files that changed from the base of the PR and between a37f98b and 296232d.

📒 Files selected for processing (1)
  • downstreamadapter/sink/cloudstorage/writer.go

Comment on lines 134 to 136
case <-overseerTicker.C:
d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds())
flushTimeSlice = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing isClosed check in ticker case may cause flushMessages to hang on graceful shutdown.

When genAndDispatchTask exits gracefully (returns nil), the errgroup context is not cancelled. Since toBeFlushedCh is never closed, flushMessages will keep running indefinitely, waking up on each overseerTicker tick but never exiting. Only the case task, ok := <-d.toBeFlushedCh branch checks isClosed.

🔧 Proposed fix
 		case <-overseerTicker.C:
+			if atomic.LoadUint64(&d.isClosed) == 1 {
+				return nil
+			}
 			d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds())
 			flushTimeSlice = 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case <-overseerTicker.C:
d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds())
flushTimeSlice = 0
case <-overseerTicker.C:
if atomic.LoadUint64(&d.isClosed) == 1 {
return nil
}
d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds())
flushTimeSlice = 0
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 134 - 136, The
overseerTicker case in flushMessages doesn't check whether the flusher has been
closed, so flushMessages can loop forever if genAndDispatchTask exits without
cancelling the context; modify the ticker branch in flushMessages (the select
case handling <-overseerTicker.C) to check the same closed condition used for
the toBeFlushedCh branch (e.g., call or inspect d.isClosed / the isClosed flag
or a closed bool set when toBeFlushedCh is closed) and return/exit the function
when closed; ensure you reference the same closure signal used by
genAndDispatchTask (toBeFlushedCh / isClosed) so both the <-d.toBeFlushedCh and
<-overseerTicker.C branches terminate consistently.

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/sink.go (1)

286-291: Consider changing log level for checkpoint skip.

The skip due to time throttling is expected behavior (2-second interval). Using log.Warn for routine throttling may generate excessive noise in production logs.

♻️ Suggested change
-		log.Warn("skip write checkpoint ts to external storage",
+		log.Debug("skip write checkpoint ts to external storage",
 			zap.Any("changefeedID", s.changefeedID),
 			zap.Uint64("checkpoint", checkpoint))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/sink.go` around lines 286 - 291, The
current log.Warn call inside the checkpoint throttle branch (checking
s.lastSendCheckpointTsTime < 2*time.Second) is too noisy for expected behavior;
change the log level to a lower verbosity (e.g., log.Debug or log.Info) and keep
the same context fields (s.changefeedID and checkpoint) so routine 2-second
throttling doesn't flood production logs; locate the warning in sink.go where
s.lastSendCheckpointTsTime, s.changefeedID, and checkpoint are referenced and
replace log.Warn with the chosen lower-level logger while preserving the message
and fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/writer_test.go`:
- Around line 59-62: The first pdlock creation and its registration are
redundant because they are immediately overwritten by mockPDClock; remove the
lines that create pdlock (pdutil.NewMonotonicClock(pclock.New())) and the
subsequent appcontext.SetService call that registers pdlock with
appcontext.DefaultPDClock, leaving only the mockPDClock :=
pdutil.NewClock4Test() and the final
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) so the test uses
the intended mock clock.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/sink.go`:
- Around line 286-291: The current log.Warn call inside the checkpoint throttle
branch (checking s.lastSendCheckpointTsTime < 2*time.Second) is too noisy for
expected behavior; change the log level to a lower verbosity (e.g., log.Debug or
log.Info) and keep the same context fields (s.changefeedID and checkpoint) so
routine 2-second throttling doesn't flood production logs; locate the warning in
sink.go where s.lastSendCheckpointTsTime, s.changefeedID, and checkpoint are
referenced and replace log.Warn with the chosen lower-level logger while
preserving the message and fields.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 420ab97e-835a-416a-889a-172b5a73fe64

📥 Commits

Reviewing files that changed from the base of the PR and between 296232d and 2e7aaa7.

📒 Files selected for processing (8)
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • downstreamadapter/sink/cloudstorage/encoding_group.go
  • downstreamadapter/sink/cloudstorage/encoding_group_test.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/cloudstorage/task.go
  • downstreamadapter/sink/cloudstorage/task_indexer.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • downstreamadapter/sink/cloudstorage/task.go

Comment on lines +131 to +152
func (eg *encodingGroup) add(ctx context.Context, task *task) error {
if task == nil {
return errors.New("nil task")
}

future := newTaskFuture(task)
inputIndex, outputIndex := eg.indexer.next(task.dispatcherID)
// Principle: encoder parallelism and writer ordering are decoupled.
// Input shard can be round-robin; output shard must be dispatcher-stable.
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case eg.inputCh[inputIndex] <- future:
}

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case eg.outputCh[outputIndex] <- future:
}
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Task may be orphaned in inputCh if second send fails.

If the first send to inputCh succeeds but the context is cancelled before the second send to outputCh, the task is enqueued for encoding but the future never reaches the output channel. The encoder will process it, but no writer will consume the result.

This is mitigated by context propagation causing the encoder to exit, but consider whether this edge case needs explicit handling for graceful shutdown semantics.

Comment on lines 59 to 62
pdlock := pdutil.NewMonotonicClock(pclock.New())
appcontext.SetService(appcontext.DefaultPDClock, pdlock)
mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Dead code: first SetService call is immediately overwritten.

Line 60 sets pdlock but line 62 immediately overwrites it with mockPDClock. Remove the redundant first assignment.

🧹 Proposed fix
-	pdlock := pdutil.NewMonotonicClock(pclock.New())
-	appcontext.SetService(appcontext.DefaultPDClock, pdlock)
 	mockPDClock := pdutil.NewClock4Test()
 	appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pdlock := pdutil.NewMonotonicClock(pclock.New())
appcontext.SetService(appcontext.DefaultPDClock, pdlock)
mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer_test.go` around lines 59 - 62, The
first pdlock creation and its registration are redundant because they are
immediately overwritten by mockPDClock; remove the lines that create pdlock
(pdutil.NewMonotonicClock(pclock.New())) and the subsequent
appcontext.SetService call that registers pdlock with appcontext.DefaultPDClock,
leaving only the mockPDClock := pdutil.NewClock4Test() and the final
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) so the test uses
the intended mock clock.

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
downstreamadapter/sink/cloudstorage/dml_writers.go (1)

84-97: ⚠️ Potential issue | 🔴 Critical

Derive and store the errgroup context before wiring shutdown.

setRunCtx(ctx) and the msgCh.Close() goroutine are both bound to the parent context, not the errgroup context created later. If any encoder/writer goroutine fails, submitTaskToEncoder() can stay blocked in msgCh.Get(), and flushDMLBeforeBlock() keeps waiting on a context that never closes. Use the derived errgroup context for both.

💡 Suggested direction
 func (d *dmlWriters) run(ctx context.Context) error {
-	d.setRunCtx(ctx)
+	eg, runCtx := errgroup.WithContext(ctx)
+	d.setRunCtx(runCtx)
 	defer d.setRunCtx(nil)

 	runDone := make(chan struct{})
 	defer close(runDone)

 	go func() {
 		select {
-		case <-ctx.Done():
+		case <-runCtx.Done():
 			d.msgCh.Close()
 		case <-runDone:
 		}
 	}()
-
-	eg, ctx := errgroup.WithContext(ctx)

 	eg.Go(func() error {
 		defer d.encodeGroup.closeInput()
-		return d.submitTaskToEncoder(ctx)
+		return d.submitTaskToEncoder(runCtx)
 	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/dml_writers.go` around lines 84 - 97, The
run method binds shutdown to the parent ctx instead of the errgroup context so
msgCh.Close and setRunCtx aren't canceled when an encoder goroutine fails;
reorder to call errgroup.WithContext first, capture the derived ctx (e.g.,
egCtx) and setRunCtx(egCtx), then start the goroutine that selects on
egCtx.Done() to call d.msgCh.Close(); this ensures
submitTaskToEncoder()/msgCh.Get() and flushDMLBeforeBlock() observe the errgroup
cancellation when any worker fails.
downstreamadapter/sink/cloudstorage/encoder_group_test.go (1)

58-61: ⚠️ Potential issue | 🟡 Minor

Bound the shard-selection retries.

Both loops can spin forever if NewDispatcherID() or the hash distribution changes. Please cap the attempts and fail explicitly so the suite stays deterministic.

Also applies to: 82-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoder_group_test.go` around lines 58 -
61, The shard-selection loops that regenerate dispatcher IDs (using
commonType.NewDispatcherID() and checking indexer.routeOutputIndex(dispatcherX)
into shardA/shardB) must be bounded to avoid infinite spins; add a retry counter
(e.g., maxAttempts constant) and break with an explicit test failure (t.Fatalf
or require.FailNow) if the limit is exceeded, replacing the endless for loops
around dispatcherA/dispatcherB so the test fails deterministically when unique
shards cannot be found after maxAttempts.
downstreamadapter/sink/cloudstorage/encoder_group.go (1)

82-87: ⚠️ Potential issue | 🔴 Critical

Stop producers before closing outputCh.

add() still publishes into eg.outputCh[...], so closing those channels here can race with an in-flight enqueue during cancellation or encoder failure. That turns shutdown into a send-on-closed-channel panic, and if the first enqueue to inputCh already succeeded the future is stranded with no writer-side consumer. Block/finish producers first, then close the output shards.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/encoder_group.go` around lines 82 - 87,
The shutdown currently closes eg.outputCh immediately after g.Wait(), which
races with add() producers that may still send into eg.outputCh and causes
send-on-closed-channel panics and stranded futures; change shutdown so producers
are completed before closing the output shards: stop or close the input path
that powers add() (e.g., close inputCh or signal producers), wait for the
producer WaitGroup or otherwise block until all add()/producer goroutines have
returned, then iterate over eg.outputCh to close each shard; ensure
encoderGroup.run, add(), eg.outputCh, g.Wait() and the inputCh/producer
WaitGroup are used to order shutdown so no sends occur after the closes.
🧹 Nitpick comments (3)
downstreamadapter/sink/cloudstorage/writer.go (2)

388-392: Consider calling closeInput() within close() for consistent shutdown.

Currently close() only sets the isClosed flag but doesn't close the input channel. Callers must remember to call both closeInput() and close() in the correct order. Consider consolidating the shutdown logic.

♻️ Proposed consolidation
 func (d *writer) close() {
 	if !d.isClosed.CompareAndSwap(false, true) {
 		return
 	}
+	d.inputCh.CloseAndDrain()
 }

Then remove the separate closeInput() method or make it idempotent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 388 - 392, The
close method on writer currently only flips isClosed and leaves the input
channel open; update writer.close() to also call closeInput() (or inline its
logic) after successfully swapping isClosed to true so shutdown is consolidated
and callers don't need to call both; ensure closeInput() is idempotent (or
remove it) and preserve any existing ordering/cleanup semantics used elsewhere
(reference writer.close(), writer.closeInput(), and isClosed.CompareAndSwap).

45-46: Replace deprecated DrainableChann with explicit drain logic.

The inputCh field uses chann.DrainableChann[*task], which is marked deprecated with "Just Don't Use It. Use a channel please." per utils/chann/drainable_chann.go:21. Consider migrating to a plain channel and implementing explicit drain logic in the closeInput() method to prevent goroutine leaks, replacing the automatic behavior of CloseAndDrain() called at line 385.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 45 - 46, Replace
the deprecated inputCh type (currently chann.DrainableChann[*task]) with a plain
channel (chan *task) and remove calls to CloseAndDrain; update the struct field
declaration for inputCh and adjust send/receive sites accordingly. Implement
explicit drain logic inside closeInput(): close the inputCh, iterate over
remaining values from inputCh until closed, and for each drained *task forward
it into toBeFlushedCh (or perform the same cleanup currently done by
CloseAndDrain) to avoid goroutine leaks. Ensure to preserve ordering and any
cancellation/error handling that writerTask and the existing flush logic expect,
and remove the dependency on chann.DrainableChann methods.
downstreamadapter/sink/cloudstorage/sink.go (1)

284-298: Consider consistent field usage in log statements.

Lines 284-286 use zap.Any("changefeedID", s.changefeedID) while lines 294-295 use separate zap.String("keyspace", ...) and zap.String("changefeed", ...) fields. For consistency across the codebase, consider using the same pattern throughout this function.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/sink.go` around lines 284 - 298, The log
field usage is inconsistent: one log uses zap.Any("changefeedID",
s.changefeedID) while another uses zap.String("keyspace",
s.changefeedID.Keyspace()) and zap.String("changefeed", s.changefeedID.Name());
update the panic log in the json.Marshal error path to use the same field
pattern as the earlier warning (or vice versa) for consistency — e.g., replace
the separate "keyspace"/"changefeed" fields with a single "changefeedID" field
using s.changefeedID (or convert the earlier zap.Any to two explicit strings),
updating the zap fields in the marshal error log where s.changefeedID is
referenced so all logs in this code path use the same field naming convention.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 384-386: closeInput currently calls d.inputCh.CloseAndDrain(),
which can discard items that genAndDispatchTask might still need because
genAndDispatchTask can block sending to toBeFlushedCh while reading from
d.inputCh.Out(), causing lost work; modify shutdown so you stop writers without
draining: replace the DrainableChann usage with a plain channel (or keep the
channel but stop calling CloseAndDrain) and implement explicit shutdown
coordination between writer.closeInput (or writer.Close) and genAndDispatchTask
— e.g., signal a done/closing channel or context that causes genAndDispatchTask
to finish processing all in-flight items and return, then close the input
channel or stop producers; update references to inputCh, closeInput,
genAndDispatchTask, and toBeFlushedCh to use this coordinated shutdown so no
items are silently discarded.

---

Duplicate comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Around line 84-97: The run method binds shutdown to the parent ctx instead of
the errgroup context so msgCh.Close and setRunCtx aren't canceled when an
encoder goroutine fails; reorder to call errgroup.WithContext first, capture the
derived ctx (e.g., egCtx) and setRunCtx(egCtx), then start the goroutine that
selects on egCtx.Done() to call d.msgCh.Close(); this ensures
submitTaskToEncoder()/msgCh.Get() and flushDMLBeforeBlock() observe the errgroup
cancellation when any worker fails.

In `@downstreamadapter/sink/cloudstorage/encoder_group_test.go`:
- Around line 58-61: The shard-selection loops that regenerate dispatcher IDs
(using commonType.NewDispatcherID() and checking
indexer.routeOutputIndex(dispatcherX) into shardA/shardB) must be bounded to
avoid infinite spins; add a retry counter (e.g., maxAttempts constant) and break
with an explicit test failure (t.Fatalf or require.FailNow) if the limit is
exceeded, replacing the endless for loops around dispatcherA/dispatcherB so the
test fails deterministically when unique shards cannot be found after
maxAttempts.

In `@downstreamadapter/sink/cloudstorage/encoder_group.go`:
- Around line 82-87: The shutdown currently closes eg.outputCh immediately after
g.Wait(), which races with add() producers that may still send into eg.outputCh
and causes send-on-closed-channel panics and stranded futures; change shutdown
so producers are completed before closing the output shards: stop or close the
input path that powers add() (e.g., close inputCh or signal producers), wait for
the producer WaitGroup or otherwise block until all add()/producer goroutines
have returned, then iterate over eg.outputCh to close each shard; ensure
encoderGroup.run, add(), eg.outputCh, g.Wait() and the inputCh/producer
WaitGroup are used to order shutdown so no sends occur after the closes.

---

Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/sink.go`:
- Around line 284-298: The log field usage is inconsistent: one log uses
zap.Any("changefeedID", s.changefeedID) while another uses
zap.String("keyspace", s.changefeedID.Keyspace()) and zap.String("changefeed",
s.changefeedID.Name()); update the panic log in the json.Marshal error path to
use the same field pattern as the earlier warning (or vice versa) for
consistency — e.g., replace the separate "keyspace"/"changefeed" fields with a
single "changefeedID" field using s.changefeedID (or convert the earlier zap.Any
to two explicit strings), updating the zap fields in the marshal error log where
s.changefeedID is referenced so all logs in this code path use the same field
naming convention.

In `@downstreamadapter/sink/cloudstorage/writer.go`:
- Around line 388-392: The close method on writer currently only flips isClosed
and leaves the input channel open; update writer.close() to also call
closeInput() (or inline its logic) after successfully swapping isClosed to true
so shutdown is consolidated and callers don't need to call both; ensure
closeInput() is idempotent (or remove it) and preserve any existing
ordering/cleanup semantics used elsewhere (reference writer.close(),
writer.closeInput(), and isClosed.CompareAndSwap).
- Around line 45-46: Replace the deprecated inputCh type (currently
chann.DrainableChann[*task]) with a plain channel (chan *task) and remove calls
to CloseAndDrain; update the struct field declaration for inputCh and adjust
send/receive sites accordingly. Implement explicit drain logic inside
closeInput(): close the inputCh, iterate over remaining values from inputCh
until closed, and for each drained *task forward it into toBeFlushedCh (or
perform the same cleanup currently done by CloseAndDrain) to avoid goroutine
leaks. Ensure to preserve ordering and any cancellation/error handling that
writerTask and the existing flush logic expect, and remove the dependency on
chann.DrainableChann methods.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5e4e4b43-6f19-4c21-b78c-24fd4eca9ef2

📥 Commits

Reviewing files that changed from the base of the PR and between 2e7aaa7 and 421ff6e.

📒 Files selected for processing (11)
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • downstreamadapter/sink/cloudstorage/dml_writers_test.go
  • downstreamadapter/sink/cloudstorage/encoder_group.go
  • downstreamadapter/sink/cloudstorage/encoder_group_test.go
  • downstreamadapter/sink/cloudstorage/indexer.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/cloudstorage/sink_test.go
  • downstreamadapter/sink/cloudstorage/task.go
  • downstreamadapter/sink/cloudstorage/task_test.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go

Comment on lines +384 to +386
func (d *writer) closeInput() {
d.inputCh.CloseAndDrain()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat utils/chann/drainable_chann.go

Repository: pingcap/ticdc

Length of output: 2193


🏁 Script executed:

sed -n '300,400p' downstreamadapter/sink/cloudstorage/writer.go

Repository: pingcap/ticdc

Length of output: 2492


🏁 Script executed:

rg "closeInput" downstreamadapter/sink/cloudstorage/writer.go -B 3 -A 3

Repository: pingcap/ticdc

Length of output: 124


🏁 Script executed:

rg "closeInput" --type go

Repository: pingcap/ticdc

Length of output: 807


🏁 Script executed:

rg "closeInput" downstreamadapter/sink/cloudstorage/dml_writers.go -B 10 -A 5

Repository: pingcap/ticdc

Length of output: 735


🏁 Script executed:

rg "func \(d \*writer\) run" downstreamadapter/sink/cloudstorage/writer.go -A 20

Repository: pingcap/ticdc

Length of output: 579


🏁 Script executed:

sed -n '1,100p' downstreamadapter/sink/cloudstorage/writer_test.go | head -150

Repository: pingcap/ticdc

Length of output: 3449


🏁 Script executed:

rg "closeInput" downstreamadapter/sink/cloudstorage/writer_test.go -B 10 -A 15

Repository: pingcap/ticdc

Length of output: 1449


closeInput() drains items that may not be fully processed by genAndDispatchTask.

CloseAndDrain() closes the channel and drains remaining items via for range ch.Out(), discarding them. Since genAndDispatchTask can be blocked on toBeFlushedCh sends while reading from d.inputCh.Out(), items may be drained and lost without being processed. This is particularly concerning given the deprecation notice on DrainableChann itself: "Deprecated: Just Don't Use It. Use a channel please" with a warning to "make sure that it is safe to drain rest elements in the channel before closing the channel."

Consider using a plain channel and explicit shutdown coordination instead of relying on DrainableChann.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/sink/cloudstorage/writer.go` around lines 384 - 386,
closeInput currently calls d.inputCh.CloseAndDrain(), which can discard items
that genAndDispatchTask might still need because genAndDispatchTask can block
sending to toBeFlushedCh while reading from d.inputCh.Out(), causing lost work;
modify shutdown so you stop writers without draining: replace the DrainableChann
usage with a plain channel (or keep the channel but stop calling CloseAndDrain)
and implement explicit shutdown coordination between writer.closeInput (or
writer.Close) and genAndDispatchTask — e.g., signal a done/closing channel or
context that causes genAndDispatchTask to finish processing all in-flight items
and return, then close the input channel or stop producers; update references to
inputCh, closeInput, genAndDispatchTask, and toBeFlushedCh to use this
coordinated shutdown so no items are silently discarded.

@3AceShowHand
Copy link
Collaborator Author

/test all

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/sink/cloudstorage/path.go (1)

145-168: ⚠️ Potential issue | 🔴 Critical

Key path state by the emitted namespace, not raw VersionedTableName.

Line 148 makes DispatcherID part of VersionedTableName, and Lines 160/168 then use that full struct as the cache key. When EnableTableAcrossNodes=false, dispatcher ID is not part of the real directory or filename namespace, so a dispatcher handoff creates separate in-memory counters for the same output path and reintroduces the stale-index / duplicate-name problem. Please normalize these caches so dispatcher ID is included only when the emitted path actually includes it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/cloudstorage/path.go` around lines 145 - 168, The maps fileIndex and
versionMap use the full VersionedTableName (which includes DispatcherID) as
keys; when EnableTableAcrossNodes is false this causes separate counters for the
same emitted path across dispatchers. Modify FilePathGenerator to derive a cache
key based on the actual emitted namespace (strip DispatcherID from
VersionedTableName when config.EnableTableAcrossNodes is false) and use that
derived key for fileIndex and versionMap lookups/updates; when
config.EnableTableAcrossNodes is true keep DispatcherID in the key. Update all
accesses that read/write fileIndex and versionMap to use this normalized key so
counters and versions are shared for the same output path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/sink/cloudstorage/table_definition.go`:
- Around line 194-197: The new TableVersion field on the table schema may be
zero for existing on-disk JSON; update the decode path so ToDDLEvent() does not
blindly use TableVersion as FinishedTs when TableVersion==0: detect
TableVersion==0 inside the schema decoding/ToDDLEvent() flow and parse the
TableVersion from the schema file name (pattern
"schema_{TableVersion}_{checksum}.json" produced by tableInfoVersion path
generation) as a fallback, then use that parsed value for FinishedTs; ensure
this backfill happens before any logic that relies on TableVersion so historical
sink output retains the correct timestamp.

---

Duplicate comments:
In `@pkg/sink/cloudstorage/path.go`:
- Around line 145-168: The maps fileIndex and versionMap use the full
VersionedTableName (which includes DispatcherID) as keys; when
EnableTableAcrossNodes is false this causes separate counters for the same
emitted path across dispatchers. Modify FilePathGenerator to derive a cache key
based on the actual emitted namespace (strip DispatcherID from
VersionedTableName when config.EnableTableAcrossNodes is false) and use that
derived key for fileIndex and versionMap lookups/updates; when
config.EnableTableAcrossNodes is true keep DispatcherID in the key. Update all
accesses that read/write fileIndex and versionMap to use this normalized key so
counters and versions are shared for the same output path.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9f8865eb-ec9d-4e58-a66a-da27e3a75a8c

📥 Commits

Reviewing files that changed from the base of the PR and between 421ff6e and 319232d.

📒 Files selected for processing (5)
  • pkg/common/event/dml_event.go
  • pkg/sink/cloudstorage/path.go
  • pkg/sink/cloudstorage/path_key.go
  • pkg/sink/cloudstorage/path_test.go
  • pkg/sink/cloudstorage/table_definition.go

Comment on lines +194 to 197
// TableVersion is the schema version encoded into schema file name:
// schema_{TableVersion}_{checksum}.json.
// It is passed from tableInfoVersion in path generation.
TableVersion uint64 `json:"TableVersion"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve compatibility with pre-existing schema files.

This adds a new on-disk TableVersion field, but old schema JSON in cloud storage will unmarshal with TableVersion == 0. ToDDLEvent() uses that field as FinishedTs, so reading historical sink output after upgrade can silently produce zero timestamps unless the decode path backfills it from the schema file path. Please add that fallback before relying on this field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/cloudstorage/table_definition.go` around lines 194 - 197, The new
TableVersion field on the table schema may be zero for existing on-disk JSON;
update the decode path so ToDDLEvent() does not blindly use TableVersion as
FinishedTs when TableVersion==0: detect TableVersion==0 inside the schema
decoding/ToDDLEvent() flow and parse the TableVersion from the schema file name
(pattern "schema_{TableVersion}_{checksum}.json" produced by tableInfoVersion
path generation) as a fallback, then use that parsed value for FinishedTs;
ensure this backfill happens before any logic that relies on TableVersion so
historical sink output retains the correct timestamp.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 6, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@3AceShowHand
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 6, 2026

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-storage-integration-heavy 16c94ff link true /test pull-cdc-storage-integration-heavy
pull-cdc-mysql-integration-light 16c94ff link true /test pull-cdc-mysql-integration-light
pull-cdc-storage-integration-light 16c94ff link true /test pull-cdc-storage-integration-light
pull-cdc-kafka-integration-heavy 16c94ff link true /test pull-cdc-kafka-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant