Skip to content
Merged
5 changes: 3 additions & 2 deletions cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
if err != nil {
return err
}

err = cfg.ValidateAndAdjust(uri)
err = cfg.ValidateAndAdjustWithOptions(uri, config.ValidateOptions{
EnableRedoIOCheck: false,
})
if err != nil {
return err
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/cdc/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) {
require.NoError(t, o.complete(f))
require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`")
}

func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "s3:///test/prefix"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

// The CLI still validates replica config, but it skips storage I/O check.
// Therefore this should pass even if the S3 URI misses bucket info.
require.NoError(t, o.completeReplicaCfg())
require.Equal(t, "eventual", *o.cfg.Consistent.Level)
require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage)
}

func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) {
t.Parallel()

o := newCreateChangefeedOptions(newChangefeedCommonOptions())
o.commonChangefeedOptions.sinkURI = "blackhole://"

dir := t.TempDir()
configPath := filepath.Join(dir, "cf.toml")
content := `
[consistent]
level = "eventual"
storage = "nfs:///ticdc-cli-should-not-io-check"
compression = "snappy"
`
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
o.commonChangefeedOptions.configFile = configPath

err := o.completeReplicaCfg()
require.Error(t, err)
require.Contains(t, err.Error(), "consistent.compression")
}
6 changes: 5 additions & 1 deletion pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type ConsistentMemoryUsage struct {

// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
func (c *ConsistentConfig) ValidateAndAdjust() error {
return c.validateAndAdjust(true)
}

func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error {
if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) {
return nil
}
Expand Down Expand Up @@ -116,7 +120,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage)))
}
return redo.ValidateStorage(uri)
return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck})
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
Expand Down
19 changes: 18 additions & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,25 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
}
}

// ValidateOptions provides optional controls for
// (*ReplicaConfig).ValidateAndAdjustWithOptions.
type ValidateOptions struct {
EnableRedoIOCheck bool
}

// ValidateAndAdjust verifies and adjusts the replica configuration.
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri
return c.ValidateAndAdjustWithOptions(
sinkURI,
ValidateOptions{EnableRedoIOCheck: true},
)
}

// ValidateAndAdjustWithOptions verifies and adjusts the replica configuration
// with extra controls.
func (c *ReplicaConfig) ValidateAndAdjustWithOptions(
sinkURI *url.URL, opts ValidateOptions,
) error {
if c.Sink != nil {
err := c.Sink.validateAndAdjust(sinkURI)
if err != nil {
Expand All @@ -275,7 +292,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
}

if c.Consistent != nil {
err := c.Consistent.ValidateAndAdjust()
err := c.Consistent.validateAndAdjust(opts.EnableRedoIOCheck)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,31 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter
return s, nil
}

// StorageValidationOptions controls whether ValidateStorage performs
// an I/O based accessibility check.
type StorageValidationOptions struct {
EnableIOCheck bool
}

// ValidateStorage validates the storage used by redo.
func ValidateStorage(uri *url.URL) error {
return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
}

// ValidateStorageWithOptions validates the storage used by redo with options.
//
// When EnableIOCheck is false, only basic scheme validation is performed.
func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error {
scheme := uri.Scheme
if !IsValidConsistentStorage(scheme) {
return errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if IsBlackholeStorage(scheme) {
return nil
}
if !opts.EnableIOCheck {
return nil
}

if IsExternalStorage(scheme) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/redo/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) {
require.NoError(t, err)
}
}

func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) {
t.Parallel()

uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket")
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false})
require.NoError(t, err)

err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
require.Error(t, err)
}
Loading