sink(cloudstorage): add use-table-id-as-path option#4356
sink(cloudstorage): add use-table-id-as-path option#4356yinshuangfei wants to merge 5 commits intopingcap:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the cloud storage sink by introducing a new configuration option, Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughAdds a Changes
Sequence Diagram(s)(Skipped — changes are file/system-wide feature additions without a single new multi-component sequential flow that warrants a diagram.) Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a use-table-id-as-path option for the cloud storage sink, enabling paths to be constructed using table IDs instead of table names. While the implementation is well-tested and appears correct, a security audit identified that the use of log.Panic for handling invalid or unexpected input from external sources (DDL events or storage files) could lead to Denial of Service (DoS). It is recommended to replace panics with proper error handling to ensure the stability of the replication process. Additionally, there are minor refactoring opportunities to improve code reuse and test structure.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/cloudstorage/sink_test.go`:
- Around line 289-293: The test is missing setup of the mock PD clock before
creating the sink; call appcontext.SetService to register the mock PD clock
service (same mock used in other tests) prior to invoking newSinkForTest so the
sink and any PD clock accessors see the mock; specifically, in the test where
ctx, cancel := context.WithCancel(context.Background()) and cloudStorageSink,
err := newSinkForTest(...) are created, insert the mock PD clock registration
(via appcontext.SetService(mockPDClock)) using the same mock instance used by
TestWriteDDLEvent/TestWriteDDLEventWithTableIDAsPath and ensure it is cleaned up
or reset after the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ef1e6151-8c9d-4aad-8422-14f382fcb166
📒 Files selected for processing (8)
downstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/config_test.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/cloudstorage/table_definition.go`:
- Around line 334-343: The GenerateSchemaFilePath function allows
useTableIDAsPath to produce a "0" path when tableID is zero; add a guard at the
top of GenerateSchemaFilePath to validate tableID (e.g., tableID > 0) when
useTableIDAsPath is true and return a clear error if invalid, before calling
generateTablePath or computing checksum, so callers fail fast instead of writing
into an ambiguous "0" directory; reference: GenerateSchemaFilePath,
useTableIDAsPath, tableID, and generateTablePath.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 402c67b5-c910-49f1-b230-ddc3b6062f96
📒 Files selected for processing (2)
downstreamadapter/sink/cloudstorage/sink_test.gopkg/sink/cloudstorage/table_definition.go
9326b6d to
7296adb
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/cloudstorage/table_definition_test.go`:
- Around line 524-529: The test assertions in table_definition_test.go expect
the old error text "tableID must be positive" but GenerateSchemaFilePath now
returns "invalid table id for table-id path"; update the two assertions that
call GenerateSchemaFilePath (the cases with tableID 0 and -1) to assert that
err.Error() contains "invalid table id for table-id path" instead of the old
string so the test matches the current implementation.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 213ab268-deb6-4709-a28f-0f9b8b979e5f
📒 Files selected for processing (2)
pkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
1e5ab99 to
3302635
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/sink/cloudstorage/table_definition_test.go (1)
522-529: Add one test that documents intended behavior when tableID is unset.These assertions currently codify hard-fail on
tableID == 0/-1. Please add a companion case that captures the intended production behavior for missing IDs (fallback vs fail-fast), so the contract stays explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/cloudstorage/table_definition_test.go` around lines 522 - 529, Add a companion unit test for the missing/unset tableID case that calls def.GenerateSchemaFilePath(false, 0) (i.e., use-table-id-as-path = false with tableID == 0) and assert the function returns no error and a schema path that uses the table name/default fallback rather than failing; this makes the contract explicit alongside the existing failure assertions for GenerateSchemaFilePath(true, 0) and GenerateSchemaFilePath(true, -1).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/cloudstorage/table_definition.go`:
- Around line 355-360: The check that hard-fails when tableID <= 0 inside the
useTableIDAsPath branch should be changed to gracefully fall back to using the
table name path instead of returning
errors.ErrInternalCheckFailed.GenWithStackByArgs; update the logic around the
useTableIDAsPath && t.Table branch (referencing useTableIDAsPath, t.Table,
tableID, and the current error return) so that if tableID is unset/<=0 you skip
the error and proceed to generate a path based on t.Table (or the existing
name-based path logic) rather than aborting.
---
Nitpick comments:
In `@pkg/sink/cloudstorage/table_definition_test.go`:
- Around line 522-529: Add a companion unit test for the missing/unset tableID
case that calls def.GenerateSchemaFilePath(false, 0) (i.e., use-table-id-as-path
= false with tableID == 0) and assert the function returns no error and a schema
path that uses the table name/default fallback rather than failing; this makes
the contract explicit alongside the existing failure assertions for
GenerateSchemaFilePath(true, 0) and GenerateSchemaFilePath(true, -1).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 77b845f5-35c1-4489-aa13-e29feadc64c4
📒 Files selected for processing (3)
pkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
|
/check-issue-triage-complete |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai resume |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/cloudstorage/path.go (1)
398-418:⚠️ Potential issue | 🟠 MajorAvoid duplicating the physical table ID in partition paths.
With both
UseTableIDAsPathandEnablePartitionSeparatorenabled, Line 400-403 and Line 417 both appendtbl.TableNameWithPhysicTableID.TableID. For partitioned tables this generates<physicalTableID>/<version>/<physicalTableID>/..., so the extra segment stops carrying distinct partition information and changes the path layout unexpectedly.💡 One safe direction if physical table ID is already the top-level key
- if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition { + if f.config.EnablePartitionSeparator && + tbl.TableNameWithPhysicTableID.IsPartition && + !f.config.UseTableIDAsPath { elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/cloudstorage/path.go` around lines 398 - 418, The code appends the physical TableID twice when both f.config.UseTableIDAsPath and f.config.EnablePartitionSeparator are true; update the partition-separator block so it only appends the extra TableID when it is not already included by generateTablePath. Concretely, in the branch that currently appends fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID) under f.config.EnablePartitionSeparator, add a guard to skip this append when f.config.UseTableIDAsPath is true (or when generateTablePath was called with the TableID-included flag), referencing f.config.UseTableIDAsPath, f.config.EnablePartitionSeparator, tbl.TableNameWithPhysicTableID.TableID, generateTablePath, and f.versionMap[tbl].
🧹 Nitpick comments (1)
pkg/sink/cloudstorage/path.go (1)
218-232: Remove the second invalid-table-ID check.Line 218 already routes through
def.GenerateSchemaFilePath(...), which is the canonical validation point foruse-table-id-as-path. The extra branch at Line 230-232 is dead on this path and duplicates the error literal, so the two sites can drift again.♻️ Suggested cleanup
tblSchemaFile, err := def.GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID) if err != nil { return false, err } exist, err := f.storage.FileExists(ctx, tblSchemaFile) if err != nil { return false, err } if exist { f.versionMap[table] = table.TableInfoVersion return false, nil } - if f.config.UseTableIDAsPath && table.TableNameWithPhysicTableID.TableID <= 0 { - return false, errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table id for table-id path") - }Based on learnings:
GenerateSchemaFilePath(useTableIDAsPath bool, tableID int64)intentionally returnsErrInternalCheckFailedwhentableID <= 0anduseTableIDAsPath=true.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/cloudstorage/path.go` around lines 218 - 232, The code duplicates validation for table IDs: GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID) already returns ErrInternalCheckFailed when UseTableIDAsPath is true and tableID <= 0, so remove the redundant conditional that checks f.config.UseTableIDAsPath && table.TableNameWithPhysicTableID.TableID <= 0 (the branch after exist check that returns ErrInternalCheckFailed) to avoid dead/duplicated validation; keep the call to def.GenerateSchemaFilePath, the FileExists check (f.storage.FileExists), and the versionMap assignment (f.versionMap[table] = table.TableInfoVersion) untouched.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/cloudstorage/sink.go`:
- Around line 227-234: The early-return in sink.writeFile incorrectly uses
!def.IsTableSchema(), causing invalid table-level DDLs to be silently skipped in
UseTableIDAsPath mode; change the condition to explicitly check for a true
database-level DDL by using def.Table == "" (and keep the surrounding
cfg.UseTableIDAsPath check) so only genuine schema-level events are skipped and
invalid table defs will proceed to GenerateSchemaFilePath and surface validation
errors; reference symbols: sink.writeFile, cfg.UseTableIDAsPath, def.Table,
def.IsTableSchema(), GenerateSchemaFilePath.
In `@pkg/config/sink.go`:
- Around line 954-965: The compatibility check must include the new
UseTableIDAsPath flag so sink-URI-only changes are detected: update
CheckCompatibilityWithSinkURI to parse UseTableIDAsPath from params (as done in
the shown block) and compare it against the current s.UseTableIDAsPath value; if
they differ, populate cfgInSinkURI and cfgInFile for UseTableIDAsPath (similar
to Protocol and TxnAtomicity) and do not let the function return early ignoring
this difference. Ensure you reference the same symbols (UseTableIDAsPath,
CheckCompatibilityWithSinkURI, cfgInSinkURI, cfgInFile, params.Get) when adding
this comparison so URI-driven toggles are correctly treated as
compatibility-breaking.
---
Outside diff comments:
In `@pkg/sink/cloudstorage/path.go`:
- Around line 398-418: The code appends the physical TableID twice when both
f.config.UseTableIDAsPath and f.config.EnablePartitionSeparator are true; update
the partition-separator block so it only appends the extra TableID when it is
not already included by generateTablePath. Concretely, in the branch that
currently appends fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)
under f.config.EnablePartitionSeparator, add a guard to skip this append when
f.config.UseTableIDAsPath is true (or when generateTablePath was called with the
TableID-included flag), referencing f.config.UseTableIDAsPath,
f.config.EnablePartitionSeparator, tbl.TableNameWithPhysicTableID.TableID,
generateTablePath, and f.versionMap[tbl].
---
Nitpick comments:
In `@pkg/sink/cloudstorage/path.go`:
- Around line 218-232: The code duplicates validation for table IDs:
GenerateSchemaFilePath(f.config.UseTableIDAsPath,
table.TableNameWithPhysicTableID.TableID) already returns ErrInternalCheckFailed
when UseTableIDAsPath is true and tableID <= 0, so remove the redundant
conditional that checks f.config.UseTableIDAsPath &&
table.TableNameWithPhysicTableID.TableID <= 0 (the branch after exist check that
returns ErrInternalCheckFailed) to avoid dead/duplicated validation; keep the
call to def.GenerateSchemaFilePath, the FileExists check (f.storage.FileExists),
and the versionMap assignment (f.versionMap[table] = table.TableInfoVersion)
untouched.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a51876cc-08fb-4742-acf1-cc189eb723ed
📒 Files selected for processing (11)
api/v2/model.goapi/v2/model_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.gopkg/config/changefeed.gopkg/config/sink.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/sink/cloudstorage/path_test.go
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
pkg/config/sink.go (1)
954-965:⚠️ Potential issue | 🟠 MajorAdd
UseTableIDAsPathto the compatibility delta check.This block makes
use-table-id-as-pathURI-governed, butCheckCompatibilityWithSinkURIstill only keys offProtocolandTxnAtomicity. A changefeed update that only togglesUseTableIDAsPathcan therefore return early and miss a real URI/config conflict.🐛 Proposed fix
- cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol || - s.TxnAtomicity != oldSinkConfig.TxnAtomicity + cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol || + s.TxnAtomicity != oldSinkConfig.TxnAtomicity || + s.UseTableIDAsPath != oldSinkConfig.UseTableIDAsPathAlso applies to: 997-1014
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/config/sink.go` around lines 954 - 965, The change makes UseTableIDAsPath (s.UseTableIDAsPath) driven from the sink URI but CheckCompatibilityWithSinkURI still ignores that key, so updates toggling UseTableIDAsPath can bypass compatibility checks; update CheckCompatibilityWithSinkURI to include UseTableIDAsPath in the compatibility delta comparison (same style as Protocol/TxnAtomicity), using the same config key string (UseTableIDAsPathKey) and comparing util.GetOrZero(s.UseTableIDAsPath) with the parsed/enabled value from cfgInSinkURI/cfgInFile, and ensure cfgInSinkURI and cfgInFile are populated consistently where UseTableIDAsPath is parsed so the function can detect and return conflicts correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/cloudstorage/path.go`:
- Around line 127-131: The generateTablePath function currently returns any
tableID as a string which allows useTableIDAsPath=true to produce "0/..." paths;
change the logic to reject non-positive IDs the same way GenerateSchemaFilePath
does: when useTableIDAsPath is true and tableID <= 0 return an error
(ErrInternalCheckFailed) instead of silently returning "0", and update callers
that build data/index directories (the code paths referenced around the
data/index generation using generateTablePath) to handle/propagate that error so
data/index path construction fails early rather than creating mixed ID/name
paths.
---
Duplicate comments:
In `@pkg/config/sink.go`:
- Around line 954-965: The change makes UseTableIDAsPath (s.UseTableIDAsPath)
driven from the sink URI but CheckCompatibilityWithSinkURI still ignores that
key, so updates toggling UseTableIDAsPath can bypass compatibility checks;
update CheckCompatibilityWithSinkURI to include UseTableIDAsPath in the
compatibility delta comparison (same style as Protocol/TxnAtomicity), using the
same config key string (UseTableIDAsPathKey) and comparing
util.GetOrZero(s.UseTableIDAsPath) with the parsed/enabled value from
cfgInSinkURI/cfgInFile, and ensure cfgInSinkURI and cfgInFile are populated
consistently where UseTableIDAsPath is parsed so the function can detect and
return conflicts correctly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b4a1808c-9d41-4b3a-904b-92ae604bef64
📒 Files selected for processing (11)
api/v2/model.goapi/v2/model_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.gopkg/config/changefeed.gopkg/config/sink.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
What problem does this PR solve?
Issue Number: close #4357
What is changed and how it works?
use-table-id-as-path.The
use-table-id-as-pathoption switches the path to use table_id instead of table_name when it set totrue.With configuration
use-table-id-as-path=truein sink uri, for example:--sink-uri="s3://cdc&use-table-id-as-path=true", the cdc path changed fromto
In this mode, we adjust cloud storage path generation to omit schema when table-id-as-path is enabled and skip DB schema writes.
use_table_id_as_pathinto v2 API conversion and sink config parsing.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
use-table-id-as-pathconfiguration parameter for cloud storage sink, enabling file organization by numeric table IDs instead of table names.Bug Fixes
Tests
Summary by CodeRabbit
New Features
Bug Fixes
Tests