Skip to content

Commit 9c02993

Browse files
authored
*: move redo s3 check from cli to server (#4281)
close #4122
1 parent 29db6eb commit 9c02993

File tree

11 files changed

+214
-22
lines changed

11 files changed

+214
-22
lines changed

cmd/cdc/cli/cli_changefeed_create.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
155155
if err != nil {
156156
return err
157157
}
158-
158+
// CLI create runs pre-validation on the local machine, so skip redo
159+
// storage I/O checks here and keep validation in cdc server.
160+
cfg.EnableRedoIOCheck = putil.AddressOf(false)
159161
err = cfg.ValidateAndAdjust(uri)
160162
if err != nil {
161163
return err

cmd/cdc/cli/cli_changefeed_create_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,48 @@ func TestChangefeedCreateCli(t *testing.T) {
173173
require.NoError(t, o.complete(f))
174174
require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`")
175175
}
176+
177+
func TestCompleteReplicaCfgSkipConsistentStorageIOCheckInCLI(t *testing.T) {
178+
t.Parallel()
179+
180+
o := newCreateChangefeedOptions(newChangefeedCommonOptions())
181+
o.commonChangefeedOptions.sinkURI = "blackhole://"
182+
183+
dir := t.TempDir()
184+
configPath := filepath.Join(dir, "cf.toml")
185+
content := `
186+
[consistent]
187+
level = "eventual"
188+
storage = "s3:///test/prefix"
189+
`
190+
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
191+
o.commonChangefeedOptions.configFile = configPath
192+
193+
// The CLI still validates replica config, but it skips storage I/O check.
194+
// Therefore this should pass even if the S3 URI misses bucket info.
195+
require.NoError(t, o.completeReplicaCfg())
196+
require.Equal(t, "eventual", *o.cfg.Consistent.Level)
197+
require.Equal(t, "s3:///test/prefix", *o.cfg.Consistent.Storage)
198+
}
199+
200+
func TestCompleteReplicaCfgStillValidateReplicaConfigInCLI(t *testing.T) {
201+
t.Parallel()
202+
203+
o := newCreateChangefeedOptions(newChangefeedCommonOptions())
204+
o.commonChangefeedOptions.sinkURI = "blackhole://"
205+
206+
dir := t.TempDir()
207+
configPath := filepath.Join(dir, "cf.toml")
208+
content := `
209+
[consistent]
210+
level = "eventual"
211+
storage = "nfs:///ticdc-cli-should-not-io-check"
212+
compression = "snappy"
213+
`
214+
require.NoError(t, os.WriteFile(configPath, []byte(content), 0o644))
215+
o.commonChangefeedOptions.configFile = configPath
216+
217+
err := o.completeReplicaCfg()
218+
require.Error(t, err)
219+
require.Contains(t, err.Error(), "consistent.compression")
220+
}

coordinator/coordinator_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ func TestScaleNode(t *testing.T) {
378378
etcdClient := newMockEtcdClient(string(info.ID))
379379
nodeManager := watcher.NewNodeManager(nil, etcdClient)
380380
appcontext.SetService(watcher.NodeManagerName, nodeManager)
381+
appcontext.SetService(appcontext.DefaultPDClock, pdutil.NewClock4Test())
382+
appcontext.SetService(appcontext.SchemaStore, eventservice.NewMockSchemaStore())
381383
nodeManager.GetAliveNodes()[info.ID] = info
382384
cfg := config.NewDefaultMessageCenterConfig(info.AdvertiseAddr)
383385
mc1 := messaging.NewMessageCenter(ctx, info.ID, cfg, nil)

pkg/config/consistent.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,15 @@ type ConsistentMemoryUsage struct {
7272
}
7373

7474
// ValidateAndAdjust validates the consistency config and adjusts it if necessary.
75+
// The exported API keeps the default behavior and always enables redo storage
76+
// I/O checks for normal callers.
7577
func (c *ConsistentConfig) ValidateAndAdjust() error {
78+
return c.validateAndAdjust(true)
79+
}
80+
81+
// validateAndAdjust is an internal helper that allows toggling redo storage
82+
// I/O checks. enableIOCheck=false is only used by CLI-side pre-validation.
83+
func (c *ConsistentConfig) validateAndAdjust(enableIOCheck bool) error {
7684
if !redo.IsConsistentEnabled(util.GetOrZero(c.Level)) {
7785
return nil
7886
}
@@ -116,7 +124,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
116124
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
117125
fmt.Sprintf("invalid storage uri: %s", util.GetOrZero(c.Storage)))
118126
}
119-
return redo.ValidateStorage(uri)
127+
return redo.ValidateStorageWithOptions(uri, redo.StorageValidationOptions{EnableIOCheck: enableIOCheck})
120128
}
121129

122130
// MaskSensitiveData masks sensitive data in ConsistentConfig

pkg/config/replica_config.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ var defaultReplicaConfig = &ReplicaConfig{
4747
MemoryQuota: util.AddressOf(uint64(DefaultChangefeedMemoryQuota)),
4848
CaseSensitive: util.AddressOf(false),
4949
CheckGCSafePoint: util.AddressOf(true),
50+
EnableRedoIOCheck: util.AddressOf(true),
5051
EnableSyncPoint: util.AddressOf(false),
5152
EnableTableMonitor: util.AddressOf(false),
5253
SyncPointInterval: util.AddressOf(10 * time.Minute),
@@ -146,6 +147,9 @@ type replicaConfig struct {
146147
CaseSensitive *bool `toml:"case-sensitive" json:"case-sensitive,omitempty"`
147148
ForceReplicate *bool `toml:"force-replicate" json:"force-replicate,omitempty"`
148149
CheckGCSafePoint *bool `toml:"check-gc-safe-point" json:"check-gc-safe-point,omitempty"`
150+
// EnableRedoIOCheck controls whether consistency storage validation should
151+
// perform an I/O accessibility check. This field is internal only.
152+
EnableRedoIOCheck *bool `toml:"-" json:"-"`
149153
// EnableSyncPoint is only available when the downstream is a Database.
150154
EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"`
151155
EnableTableMonitor *bool `toml:"enable-table-monitor" json:"enable-table-monitor"`
@@ -250,6 +254,9 @@ func (c *ReplicaConfig) Clone() *ReplicaConfig {
250254
log.Panic("failed to unmarshal replica config",
251255
zap.Error(cerror.WrapError(cerror.ErrDecodeFailed, err)))
252256
}
257+
if c.EnableRedoIOCheck != nil {
258+
clone.EnableRedoIOCheck = util.AddressOf(*c.EnableRedoIOCheck)
259+
}
253260
return clone
254261
}
255262

@@ -267,6 +274,11 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
267274

268275
// ValidateAndAdjust verifies and adjusts the replica configuration.
269276
func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri
277+
enableRedoIOCheck := true
278+
if c.EnableRedoIOCheck != nil {
279+
enableRedoIOCheck = *c.EnableRedoIOCheck
280+
}
281+
270282
if c.Sink != nil {
271283
err := c.Sink.validateAndAdjust(sinkURI)
272284
if err != nil {
@@ -275,7 +287,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
275287
}
276288

277289
if c.Consistent != nil {
278-
err := c.Consistent.ValidateAndAdjust()
290+
err := c.Consistent.validateAndAdjust(enableRedoIOCheck)
279291
if err != nil {
280292
return err
281293
}

pkg/config/replica_config_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,29 @@ func TestReplicaConfig_EnableSplittableCheck_DefaultValue(t *testing.T) {
193193
require.NotNil(t, config.Scheduler)
194194
require.False(t, util.GetOrZero(config.Scheduler.EnableSplittableCheck))
195195
}
196+
197+
func TestReplicaConfig_EnableRedoIOCheck_DefaultValue(t *testing.T) {
198+
config := GetDefaultReplicaConfig()
199+
require.True(t, util.GetOrZero(config.EnableRedoIOCheck))
200+
}
201+
202+
func TestReplicaConfig_EnableRedoIOCheck_DefaultEnabled(t *testing.T) {
203+
config := GetDefaultReplicaConfig()
204+
config.Consistent.Level = util.AddressOf("eventual")
205+
config.Consistent.Storage = util.AddressOf("s3:///redo-test-no-bucket")
206+
207+
sinkURI, err := url.Parse("blackhole://")
208+
require.NoError(t, err)
209+
require.Error(t, config.ValidateAndAdjust(sinkURI))
210+
}
211+
212+
func TestReplicaConfig_EnableRedoIOCheck_CanDisableForCLI(t *testing.T) {
213+
config := GetDefaultReplicaConfig()
214+
config.EnableRedoIOCheck = util.AddressOf(false)
215+
config.Consistent.Level = util.AddressOf("eventual")
216+
config.Consistent.Storage = util.AddressOf("s3:///redo-test-no-bucket")
217+
218+
sinkURI, err := url.Parse("blackhole://")
219+
require.NoError(t, err)
220+
require.NoError(t, config.ValidateAndAdjust(sinkURI))
221+
}

pkg/migrate/migrate_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ func TestMigration(t *testing.T) {
8787
}
8888
status2 := config.ChangeFeedStatus{CheckpointTs: 2}
8989
cfg := config.GetDefaultReplicaConfig()
90+
// This internal field is not persisted in changefeed metadata.
91+
cfg.EnableRedoIOCheck = nil
9092
cfg.CheckGCSafePoint = util.AddressOf(false)
9193
cfg.Sink = &config.SinkConfig{
9294
DispatchRules: []*config.DispatchRule{

pkg/redo/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,15 +214,31 @@ func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.Exter
214214
return s, nil
215215
}
216216

217+
// StorageValidationOptions controls whether ValidateStorage performs
218+
// an I/O based accessibility check.
219+
type StorageValidationOptions struct {
220+
EnableIOCheck bool
221+
}
222+
217223
// ValidateStorage validates the storage used by redo.
218224
func ValidateStorage(uri *url.URL) error {
225+
return ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
226+
}
227+
228+
// ValidateStorageWithOptions validates the storage used by redo with options.
229+
//
230+
// When EnableIOCheck is false, only basic scheme validation is performed.
231+
func ValidateStorageWithOptions(uri *url.URL, opts StorageValidationOptions) error {
219232
scheme := uri.Scheme
220233
if !IsValidConsistentStorage(scheme) {
221234
return errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
222235
}
223236
if IsBlackholeStorage(scheme) {
224237
return nil
225238
}
239+
if !opts.EnableIOCheck {
240+
return nil
241+
}
226242

227243
if IsExternalStorage(scheme) {
228244
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

pkg/redo/config_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,16 @@ func TestInitExternalStorage(t *testing.T) {
187187
require.NoError(t, err)
188188
}
189189
}
190+
191+
func TestValidateStorageWithOptionsSkipIOCheck(t *testing.T) {
192+
t.Parallel()
193+
194+
uri, err := storage.ParseRawURL("s3:///redo-test-no-bucket")
195+
require.NoError(t, err)
196+
197+
err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: false})
198+
require.NoError(t, err)
199+
200+
err = ValidateStorageWithOptions(uri, StorageValidationOptions{EnableIOCheck: true})
201+
require.Error(t, err)
202+
}

pkg/redo/writer/file/file.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -303,31 +303,32 @@ func (w *Writer) encode(ctx context.Context) error {
303303
cacheEventPostFlush = cacheEventPostFlush[:0]
304304
return nil
305305
}
306-
select {
307-
case <-ctx.Done():
308-
return ctx.Err()
309-
case <-ticker.C:
310-
err := flush()
311-
if err != nil {
312-
return errors.Trace(err)
313-
}
314-
case e := <-w.inputCh:
315-
err := w.write(e)
316-
if err != nil {
317-
return err
318-
}
319-
num++
320-
if num > redo.DefaultFlushBatchSize {
306+
for {
307+
select {
308+
case <-ctx.Done():
309+
return ctx.Err()
310+
case <-ticker.C:
321311
err := flush()
322312
if err != nil {
323313
return errors.Trace(err)
324314
}
325-
e.PostFlush()
326-
} else {
327-
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
315+
case e := <-w.inputCh:
316+
err := w.write(e)
317+
if err != nil {
318+
return err
319+
}
320+
num++
321+
if num >= redo.DefaultFlushBatchSize {
322+
err := flush()
323+
if err != nil {
324+
return errors.Trace(err)
325+
}
326+
e.PostFlush()
327+
} else {
328+
cacheEventPostFlush = append(cacheEventPostFlush, e.PostFlush)
329+
}
328330
}
329331
}
330-
return nil
331332
}
332333

333334
func (w *Writer) close(ctx context.Context) error {

0 commit comments

Comments
 (0)