Skip to content
Merged
5 changes: 3 additions & 2 deletions cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
if err != nil {
return err
}

err = cfg.ValidateAndAdjust(uri)
err = cfg.ValidateAndAdjustWithOptions(uri, config.ValidateOptions{
EnableRedoIOCheck: false,
})
if err != nil {
return err
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/cdc/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) {
require.NoError(t, o.complete(f))
require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`")
}

func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "s3:///test/prefix"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

// The CLI still validates replica config, but it skips storage I/O check.
// Therefore this should pass even if the S3 URI misses bucket info.
require.NoError(t, o.completeReplicaCfg())
require.Equal(t, "eventual", *o.cfg.Consistent.Level)
require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage)
}

func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "nfs:///ticdc-cli-should-not-io-check"
compression = "snappy"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

err := o.completeReplicaCfg()
require.Error(t, err)
require.Contains(t, err.Error(), "consistent.compression")
}
6 changes: 5 additions & 1 deletion pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type ConsistentMemoryUsage struct {

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
func (c *ConsistentConfig) ValidateAndAdjust() error {
return c.validateAndAdjust(true)
}

func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error {
if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) {
return nil
}
Expand Down Expand Up @@ -116,7 +120,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage)))
}
return redo.ValidateStorage(uri)
return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck})
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
Expand Down
19 changes: 18 additions & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,25 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
}
}

// ValidateOptions provides optional controls for
// (*ReplicaConfig).ValidateAndAdjustWithOptions.
type ValidateOptions struct {
EnableRedoIOCheck bool
}

// ValidateAndAdjust verifies and adjusts the replica configuration.
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri
return c.ValidateAndAdjustWithOptions(
sinkURI,
ValidateOptions{EnableRedoIOCheck: true},
)
}

// ValidateAndAdjustWithOptions verifies and adjusts the replica configuration
// with extra controls.
func (c *ReplicaConfig) ValidateAndAdjustWithOptions(
sinkURI *url.URL, opts ValidateOptions,
) error {
if c.Sink != nil {
err := c.Sink.validateAndAdjust(sinkURI)
if err != nil {
Expand All @@ -275,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
}

if c.Consistent != nil {
err := c.Consistent.ValidateAndAdjust()
err := c.Consistent.validateAndAdjust(opts.EnableRedoIOCheck)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,31 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter
return s, nil
}

// StorageValidationOptions controls whether ValidateStorage performs
// an I/O based accessibility check.
type StorageValidationOptions struct {
EnableIOCheck bool
}

// ValidateStorage validates the storage used by redo.
func ValidateStorage(uri *url.URL) error {
return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
}

// ValidateStorageWithOptions validates the storage used by redo with options.
//
// When EnableIOCheck is false, only basic scheme validation is performed.
func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error {
scheme := uri.Scheme
if !IsValidConsistentStorage(scheme) {
return errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if IsBlackholeStorage(scheme) {
return nil
}
if !opts.EnableIOCheck {
return nil
}

if IsExternalStorage(scheme) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/redo/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) {
require.NoError(t, err)
}
}

func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) {
t.Parallel()

uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket")
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false})
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
require.Error(t, err)
}
39 changes: 20 additions & 19 deletions pkg/redo/writer/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,31 +303,32 @@ func (w *Writer) encode(ctx context.Context) error {
cacheEventPostFlush = cacheEventPostFlush[:0]
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := flush()
if err != nil {
return errors.Trace(err)
}
case e := <-w.inputCh:
err := w.write(e)
if err != nil {
return err
}
num++
if num > redo.DefaultFlushBatchSize {
for {
select {
case <-ctx.Done():
return ctx.Err()
Comment on lines +308 to +309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Flush pending callbacks before exiting on context cancellation.

Returning immediately on ctx.Done() drops queued cacheEventPostFlush callbacks for already-written events. Please flush pending batch work before exit.

🔧 Proposed fix
-		case <-ctx.Done():
-			return ctx.Err()
+		case <-ctx.Done():
+			if num > 0 {
+				if err := flush(); err != nil {
+					return errors.Trace(err)
+				}
+			}
+			return ctx.Err()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case <-ctx.Done():
return ctx.Err()
case <-ctx.Done():
if num > 0 {
if err := flush(); err != nil {
return errors.Trace(err)
}
}
return ctx.Err()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/redo/writer/file/file.go` around lines 308 - 309, In the select branch
that handles <-ctx.Done() (where the code currently returns ctx.Err()), ensure
you flush any pending batch work and run queued cacheEventPostFlush callbacks
before returning: call the existing batch-flush/drain routine (the function that
processes the in-flight/write batch and invokes cacheEventPostFlush callbacks)
to drain and execute all pending callbacks, wait for it to complete or time out
appropriately, and only then return ctx.Err(); update the <-ctx.Done() case to
perform that flush/drain step instead of returning immediately.

case <-ticker.C:
err := flush()
if err != nil {
return errors.Trace(err)
}
e.PostFlush()
} else {
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
case e := <-w.inputCh:
err := w.write(e)
if err != nil {
return err
}
num++
if num >= redo.DefaultFlushBatchSize {
err := flush()
if err != nil {
return errors.Trace(err)
}
e.PostFlush()
} else {
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
}
}
}
return nil
}

func (w *Writer) close(ctx context.Context) error {
Expand Down
65 changes: 65 additions & 0 deletions pkg/redo/writer/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"os"
"path/filepath"
"testing"
"time"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/ticdc/pkg/common"
pevent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/fsutil"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
Expand Down Expand Up @@ -401,3 +404,65 @@ func TestRotateFileWithoutFileAllocator(t *testing.T) {

w.Close()
}

func TestRunFlushesOnBatchBoundaryAndExecutesPostFlush(t *testing.T) {
t.Parallel()

dir := t.TempDir()
flushIntervalInMs := int64(60 * 1000)
flushWorkerNum := 9
w, err := NewFileWriter(context.Background(), &writer.LogWriterConfig{
ConsistentConfig: config.ConsistentConfig{
FlushIntervalInMs: &flushIntervalInMs,
FlushWorkerNum: &flushWorkerNum,
},
Dir: dir,
CaptureID: "cp",
ChangeFeedID: common.NewChangeFeedIDWithName("test-run-batch", common.DefaultKeyspaceName),
MaxLogSizeInBytes: redo.DefaultMaxLogSize * redo.Megabyte,
}, redo.RedoRowLogFileType)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
runErrCh := make(chan error, 1)
go func() {
runErrCh <- w.Run(ctx)
}()

postFlushCnt := atomic.NewInt64(0)
for i := 0; i < redo.DefaultFlushBatchSize-1; i++ {
ts := uint64(i + 1)
w.GetInputCh() <- &pevent.RedoRowEvent{
StartTs: ts,
CommitTs: ts,
Callback: func() {
postFlushCnt.Inc()
},
}
}

// The callback should not be executed before the batch reaches the boundary.
require.Equal(t, int64(0), postFlushCnt.Load())
select {
case err := <-runErrCh:
require.Failf(t, "run exited unexpectedly", "run returned before cancel: %v", err)
default:
}

ts := uint64(redo.DefaultFlushBatchSize)
w.GetInputCh() <- &pevent.RedoRowEvent{
StartTs: ts,
CommitTs: ts,
Callback: func() {
postFlushCnt.Inc()
},
}

require.Eventually(t, func() bool {
return postFlushCnt.Load() == int64(redo.DefaultFlushBatchSize)
}, 10*time.Second, 20*time.Millisecond)

cancel()
require.ErrorIs(t, <-runErrCh, context.Canceled)
require.NoError(t, w.Close())
}