Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
if err != nil {
return err
}

// CLI create runs pre-validation on the local machine, so skip redo
// storage I/O checks here and keep validation in cdc server.
cfg.EnableRedoIOCheck = putil.AddressOf(false)
err = cfg.ValidateAndAdjust(uri)
if err != nil {
return err
Expand Down
45 changes: 45 additions & 0 deletions cmd/cdc/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) {
require.NoError(t, o.complete(f))
require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`")
}

func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "s3:///test/prefix"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

// The CLI still validates replica config, but it skips storage I/O check.
// Therefore this should pass even if the S3 URI misses bucket info.
require.NoError(t, o.completeReplicaCfg())
require.Equal(t, "eventual", *o.cfg.Consistent.Level)
require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage)
}

func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "nfs:///ticdc-cli-should-not-io-check"
compression = "snappy"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

err := o.completeReplicaCfg()
require.Error(t, err)
require.Contains(t, err.Error(), "consistent.compression")
}
2 changes: 2 additions & 0 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ func TestScaleNode(t *testing.T) {
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test())
appcontext.SetService(appcontext.SchemaStore, eventservice.NewMockSchemaStore())
nodeManager.GetAliveNodes()[info.ID] = info
cfg := config.NewDefaultMessageCenterConfig(info.AdvertiseAddr)
mc1 := messaging.NewMessageCenter(ctx, info.ID, cfg, nil)
Expand Down
10 changes: 9 additions & 1 deletion pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ type ConsistentMemoryUsage struct {
}

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
// The exported API keeps the default behavior and always enables redo storage
// I/O checks for normal callers.
func (c *ConsistentConfig) ValidateAndAdjust() error {
return c.validateAndAdjust(true)
}

// validateAndAdjust is an internal helper that allows toggling redo storage
// I/O checks. enableIOCheck=false is only used by CLI-side pre-validation.
func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error {
if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) {
return nil
}
Expand Down Expand Up @@ -116,7 +124,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage)))
}
return redo.ValidateStorage(uri)
return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck})
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
Expand Down
14 changes: 13 additions & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var defaultReplicaConfig = &ReplicaConfig{
MemoryQuota: util.AddressOf(uint64(DefaultChangefeedMemoryQuota)),
CaseSensitive: util.AddressOf(false),
CheckGCSafePoint: util.AddressOf(true),
EnableRedoIOCheck: util.AddressOf(true),
EnableSyncPoint: util.AddressOf(false),
EnableTableMonitor: util.AddressOf(false),
SyncPointInterval: util.AddressOf(10 * time.Minute),
Expand Down Expand Up @@ -146,6 +147,9 @@ type replicaConfig struct {
CaseSensitive *bool `toml:"case-sensitive" json:"case-sensitive,omitempty"`
ForceReplicate *bool `toml:"force-replicate" json:"force-replicate,omitempty"`
CheckGCSafePoint *bool `toml:"check-gc-safe-point" json:"check-gc-safe-point,omitempty"`
// EnableRedoIOCheck controls whether consistency storage validation should
// perform an I/O accessibility check. This field is internal only.
EnableRedoIOCheck *bool `toml:"-" json:"-"`
// EnableSyncPoint is only available when the downstream is a Database.
EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"`
EnableTableMonitor *bool `toml:"enable-table-monitor" json:"enable-table-monitor"`
Expand Down Expand Up @@ -250,6 +254,9 @@ func (c *ReplicaConfig) Clone() *ReplicaConfig {
log.Panic("failed to unmarshal replica config",
zap.Error(cerror.WrapError(cerror.ErrDecodeFailed, err)))
}
if c.EnableRedoIOCheck != nil {
clone.EnableRedoIOCheck = util.AddressOf(*c.EnableRedoIOCheck)
}
return clone
}

Expand All @@ -267,6 +274,11 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {

// ValidateAndAdjust verifies and adjusts the replica configuration.
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri
enableRedoIOCheck := true
if c.EnableRedoIOCheck != nil {
enableRedoIOCheck = *c.EnableRedoIOCheck
}

if c.Sink != nil {
err := c.Sink.validateAndAdjust(sinkURI)
if err != nil {
Expand All @@ -275,7 +287,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
}

if c.Consistent != nil {
err := c.Consistent.ValidateAndAdjust()
err := c.Consistent.validateAndAdjust(enableRedoIOCheck)
if err != nil {
return err
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,29 @@ func TestReplicaConfig_EnableSplittableCheck_DefaultValue(t *testing.T) {
require.NotNil(t, config.Scheduler)
require.False(t, util.GetOrZero(config.Scheduler.EnableSplittableCheck))
}

func TestReplicaConfig_EnableRedoIOCheck_DefaultValue(t *testing.T) {
config := GetDefaultReplicaConfig()
require.True(t, util.GetOrZero(config.EnableRedoIOCheck))
}

func TestReplicaConfig_EnableRedoIOCheck_DefaultEnabled(t *testing.T) {
config := GetDefaultReplicaConfig()
config.Consistent.Level = util.AddressOf("eventual")
config.Consistent.Storage = util.AddressOf("s3:///redo-test-no-bucket")

sinkURI, err := url.Parse("blackhole://")
require.NoError(t, err)
require.Error(t, config.ValidateAndAdjust(sinkURI))
}

func TestReplicaConfig_EnableRedoIOCheck_CanDisableForCLI(t *testing.T) {
config := GetDefaultReplicaConfig()
config.EnableRedoIOCheck = util.AddressOf(false)
config.Consistent.Level = util.AddressOf("eventual")
config.Consistent.Storage = util.AddressOf("s3:///redo-test-no-bucket")

sinkURI, err := url.Parse("blackhole://")
require.NoError(t, err)
require.NoError(t, config.ValidateAndAdjust(sinkURI))
}
2 changes: 2 additions & 0 deletions pkg/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestMigration(t *testing.T) {
}
status2 := config.ChangeFeedStatus{CheckpointTs: 2}
cfg := config.GetDefaultReplicaConfig()
// This internal field is not persisted in changefeed metadata.
cfg.EnableRedoIOCheck = nil
cfg.CheckGCSafePoint = util.AddressOf(false)
cfg.Sink = &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
Expand Down
16 changes: 16 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,31 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter
return s, nil
}

// StorageValidationOptions controls whether ValidateStorage performs
// an I/O based accessibility check.
type StorageValidationOptions struct {
EnableIOCheck bool
}

// ValidateStorage validates the storage used by redo.
func ValidateStorage(uri *url.URL) error {
return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
}

// ValidateStorageWithOptions validates the storage used by redo with options.
//
// When EnableIOCheck is false, only basic scheme validation is performed.
func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error {
scheme := uri.Scheme
if !IsValidConsistentStorage(scheme) {
return errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if IsBlackholeStorage(scheme) {
return nil
}
if !opts.EnableIOCheck {
return nil
}

if IsExternalStorage(scheme) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/redo/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) {
require.NoError(t, err)
}
}

func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) {
t.Parallel()

uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket")
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false})
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
require.Error(t, err)
}
39 changes: 20 additions & 19 deletions pkg/redo/writer/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,31 +303,32 @@ func (w *Writer) encode(ctx context.Context) error {
cacheEventPostFlush = cacheEventPostFlush[:0]
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := flush()
if err != nil {
return errors.Trace(err)
}
case e := <-w.inputCh:
err := w.write(e)
if err != nil {
return err
}
num++
if num > redo.DefaultFlushBatchSize {
for {
select {
case <-ctx.Done():
return ctx.Err()
Comment on lines +308 to +309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Flush pending callbacks before exiting on context cancellation.

Returning immediately on ctx.Done() drops queued cacheEventPostFlush callbacks for already-written events. Please flush pending batch work before exit.

🔧 Proposed fix
-		case <-ctx.Done():
-			return ctx.Err()
+		case <-ctx.Done():
+			if num > 0 {
+				if err := flush(); err != nil {
+					return errors.Trace(err)
+				}
+			}
+			return ctx.Err()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case <-ctx.Done():
return ctx.Err()
case <-ctx.Done():
if num > 0 {
if err := flush(); err != nil {
return errors.Trace(err)
}
}
return ctx.Err()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/file/file.go` around lines 308 - 309, In the select branch
that handles <-ctx.Done() (where the code currently returns ctx.Err()), ensure
you flush any pending batch work and run queued cacheEventPostFlush callbacks
before returning: call the existing batch-flush/drain routine (the function that
processes the in-flight/write batch and invokes cacheEventPostFlush callbacks)
to drain and execute all pending callbacks, wait for it to complete or time out
appropriately, and only then return ctx.Err(); update the <-ctx.Done() case to
perform that flush/drain step instead of returning immediately.

case <-ticker.C:
err := flush()
if err != nil {
return errors.Trace(err)
}
e.PostFlush()
} else {
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
case e := <-w.inputCh:
err := w.write(e)
if err != nil {
return err
}
num++
if num >= redo.DefaultFlushBatchSize {
err := flush()
if err != nil {
return errors.Trace(err)
}
e.PostFlush()
} else {
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
}
}
}
return nil
}

func (w *Writer) close(ctx context.Context) error {
Expand Down
65 changes: 65 additions & 0 deletions pkg/redo/writer/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"os"
"path/filepath"
"testing"
"time"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/ticdc/pkg/common"
pevent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/fsutil"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
Expand Down Expand Up @@ -401,3 +404,65 @@ func TestRotateFileWithoutFileAllocator(t *testing.T) {

w.Close()
}

func TestRunFlushesOnBatchBoundaryAndExecutesPostFlush(t *testing.T) {
t.Parallel()

dir := t.TempDir()
flushIntervalInMs := int64(60 * 1000)
flushWorkerNum := 9
w, err := NewFileWriter(context.Background(), &writer.LogWriterConfig{
ConsistentConfig: config.ConsistentConfig{
FlushIntervalInMs: &flushIntervalInMs,
FlushWorkerNum: &flushWorkerNum,
},
Dir: dir,
CaptureID: "cp",
ChangeFeedID: common.NewChangeFeedIDWithName("test-run-batch", common.DefaultKeyspaceName),
MaxLogSizeInBytes: redo.DefaultMaxLogSize * redo.Megabyte,
}, redo.RedoRowLogFileType)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
runErrCh := make(chan error, 1)
go func() {
runErrCh <- w.Run(ctx)
}()

postFlushCnt := atomic.NewInt64(0)
for i := 0; i < redo.DefaultFlushBatchSize-1; i++ {
ts := uint64(i + 1)
w.GetInputCh() <- &pevent.RedoRowEvent{
StartTs: ts,
CommitTs: ts,
Callback: func() {
postFlushCnt.Inc()
},
}
}

// The callback should not be executed before the batch reaches the boundary.
require.Equal(t, int64(0), postFlushCnt.Load())
select {
case err := <-runErrCh:
require.Failf(t, "run exited unexpectedly", "run returned before cancel: %v", err)
default:
}

ts := uint64(redo.DefaultFlushBatchSize)
w.GetInputCh() <- &pevent.RedoRowEvent{
StartTs: ts,
CommitTs: ts,
Callback: func() {
postFlushCnt.Inc()
},
}

require.Eventually(t, func() bool {
return postFlushCnt.Load() == int64(redo.DefaultFlushBatchSize)
}, 10*time.Second, 20*time.Millisecond)

cancel()
require.ErrorIs(t, <-runErrCh, context.Canceled)
require.NoError(t, w.Close())
}