Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SafeMode: c.Sink.SafeMode,
OpenProtocol: openProtocolConfig,
Debezium: debeziumConfig,
UseTableIDAsPath: c.Sink.UseTableIDAsPath,
}

if c.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -896,6 +897,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SafeMode: cloned.Sink.SafeMode,
DebeziumConfig: debeziumConfig,
OpenProtocolConfig: openProtocolConfig,
UseTableIDAsPath: cloned.Sink.UseTableIDAsPath,
}

if cloned.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -1156,6 +1158,7 @@ type SinkConfig struct {
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
5 changes: 5 additions & 0 deletions api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestReplicaConfigConversion(t *testing.T) {
EnableSyncPoint: util.AddressOf(true),
EnableTableMonitor: util.AddressOf(true),
BDRMode: util.AddressOf(true),
Sink: &SinkConfig{
UseTableIDAsPath: util.AddressOf(true),
},
Mounter: &MounterConfig{
WorkerNum: util.AddressOf(16),
},
Expand Down Expand Up @@ -61,6 +64,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, util.GetOrZero(internalCfg.EnableSyncPoint))
require.True(t, util.GetOrZero(internalCfg.EnableTableMonitor))
require.True(t, util.GetOrZero(internalCfg.BDRMode))
require.True(t, util.GetOrZero(internalCfg.Sink.UseTableIDAsPath))
require.Equal(t, internalCfg.Mounter.WorkerNum, *apiCfg.Mounter.WorkerNum)
require.True(t, util.GetOrZero(internalCfg.Scheduler.EnableTableAcrossNodes))
require.Equal(t, 1000, util.GetOrZero(internalCfg.Scheduler.RegionThreshold))
Expand All @@ -85,6 +89,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, *apiCfgBack.CaseSensitive)
require.True(t, *apiCfgBack.ForceReplicate)
require.True(t, *apiCfgBack.IgnoreIneligibleTable)
require.True(t, *apiCfgBack.Sink.UseTableIDAsPath)
require.Equal(t, 16, *apiCfgBack.Mounter.WorkerNum)
require.True(t, *apiCfgBack.Scheduler.EnableTableAcrossNodes)
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
Expand Down
25 changes: 19 additions & 6 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,37 +194,50 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
// For exchange partition, we need to write the schema of the source table.
// write the previous table first
if event.GetDDLType() == model.ActionExchangeTablePartition {
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid exchange partition ddl event, source table info is missing")
}
sourceTableInfo := event.MultipleTableInfos[1]

var def cloudstorage.TableDefinition
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
def.Query = event.Query
def.Type = event.Type
if err := s.writeFile(event, def); err != nil {
if err := s.writeFile(event, def, event.GetTableID()); err != nil {
return err
}
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef); err != nil {
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef, sourceTableInfo.TableName.TableID); err != nil {
return err
}
} else {
for _, e := range event.GetEvents() {
var def cloudstorage.TableDefinition
def.FromDDLEvent(e, s.cfg.OutputColumnID)
if err := s.writeFile(e, def); err != nil {
if err := s.writeFile(e, def, e.GetTableID()); err != nil {
return err
}
}
}
return nil
}

func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition, tableID int64) error {
// skip write database-level event for 'use-table-id-as-path' mode
if s.cfg.UseTableIDAsPath && def.Table == "" {
log.Debug("skip database schema for table id path",
zap.String("schema", def.Schema),
zap.String("query", def.Query))
return nil
}
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, tableID)
if err != nil {
return errors.Trace(err)
}
Expand Down
155 changes: 155 additions & 0 deletions downstreamadapter/sink/cloudstorage/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,161 @@ func TestWriteDDLEvent(t *testing.T) {
}`, string(tableSchema))
}

func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: ast.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
})
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 add col2 varchar(64)",
Type: byte(timodel.ActionAddColumn),
SchemaName: "test",
TableName: "table1",
FinishedTs: 100,
TableInfo: tableInfo,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

tableDir := path.Join(parentDir, "20/meta/")
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.NoError(t, err)
require.Contains(t, string(tableSchema), `"Table": "table1"`)
}

func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

ddlEvent := &commonEvent.DDLEvent{
Query: "create database test_db",
Type: byte(timodel.ActionCreateSchema),
SchemaName: "test_db",
TableName: "",
FinishedTs: 100,
TableInfo: nil,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

_, err = os.Stat(path.Join(parentDir, "test_db"))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}

func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
testCases := []struct {
name string
multipleTableInfos []*common.TableInfo
}{
{
name: "nil source table info",
multipleTableInfos: []*common.TableInfo{nil},
},
{
name: "short table infos",
multipleTableInfos: nil,
},
}

parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
},
})

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 exchange partition p0 with table test.table2",
Type: byte(timodel.ActionExchangeTablePartition),
SchemaName: "test",
TableName: "table1",
ExtraSchemaName: "test",
ExtraTableName: "table2",
FinishedTs: 100,
TableInfo: tableInfo,
}
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
})
}
}

func TestWriteCheckpointEvent(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
Expand Down
10 changes: 9 additions & 1 deletion downstreamadapter/sink/cloudstorage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,15 @@ func (d *writer) flushMessages(ctx context.Context) error {
zap.Error(err))
return errors.Trace(err)
}
indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date)
indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date)
if err != nil {
log.Error("failed to generate index file path",
zap.Int("workerID", d.id),
zap.String("keyspace", d.changeFeedID.Keyspace()),
zap.Stringer("changefeed", d.changeFeedID.ID()),
zap.Error(err))
return errors.Trace(err)
}

// first write the data file to external storage.
err = d.writeDataFile(ctx, dataFilePath, indexFilePath, task)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func (info *ChangeFeedInfo) rmStorageOnlyFields() {
info.Config.Sink.DateSeparator = nil
info.Config.Sink.EnablePartitionSeparator = nil
info.Config.Sink.FileIndexWidth = nil
info.Config.Sink.UseTableIDAsPath = nil
info.Config.Sink.CloudStorageConfig = nil
}

Expand Down
20 changes: 19 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (

// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// UseTableIDAsPathKey specifies the key of the use-table-id-as-path in the SinkURI.
UseTableIDAsPathKey = "use-table-id-as-path"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity
// unknownTxnAtomicity is an invalid atomicity level and will be treated as
Expand Down Expand Up @@ -154,6 +156,8 @@ type SinkConfig struct {
EnablePartitionSeparator *bool `toml:"enable-partition-separator" json:"enable-partition-separator,omitempty"`
// FileIndexWidth is only available when the downstream is Storage
FileIndexWidth *int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`
// UseTableIDAsPath is only available when the downstream is Storage.
UseTableIDAsPath *bool `toml:"use-table-id-as-path" json:"use-table-id-as-path,omitempty"`

EnableKafkaSinkV2 *bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2,omitempty"`

Expand Down Expand Up @@ -947,6 +951,19 @@ func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error {
s.Protocol = util.AddressOf(protocolFromURI)
}

useTableIDAsPathFromURI := params.Get(UseTableIDAsPathKey)
if useTableIDAsPathFromURI != "" {
enabled, err := strconv.ParseBool(useTableIDAsPathFromURI)
if err != nil {
return errors.Trace(err)
}
if s.UseTableIDAsPath != nil && util.GetOrZero(s.UseTableIDAsPath) != enabled {
cfgInSinkURI[UseTableIDAsPathKey] = strconv.FormatBool(enabled)
cfgInFile[UseTableIDAsPathKey] = strconv.FormatBool(util.GetOrZero(s.UseTableIDAsPath))
}
s.UseTableIDAsPath = util.AddressOf(enabled)
}

getError := func() error {
if len(cfgInSinkURI) != len(cfgInFile) {
log.Panic("inconsistent configuration items in sink uri and configuration file",
Expand Down Expand Up @@ -978,7 +995,8 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI(
}

cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
s.TxnAtomicity != oldSinkConfig.TxnAtomicity
s.TxnAtomicity != oldSinkConfig.TxnAtomicity ||
s.UseTableIDAsPath != oldSinkConfig.UseTableIDAsPath

isURIParamsChanged := func(oldCfg SinkConfig) bool {
err := oldCfg.applyParameterBySinkURI(sinkURI)
Expand Down
14 changes: 11 additions & 3 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ const (
)

type urlConfig struct {
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
UseTableIDAsPath *bool `form:"use-table-id-as-path"`
}

// Config is the configuration for cloud storage sink.
Expand All @@ -82,6 +83,7 @@ type Config struct {
OutputColumnID bool
FlushConcurrency int
EnableTableAcrossNodes bool
UseTableIDAsPath bool
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -132,6 +134,9 @@ func (c *Config) Apply(
if err != nil {
return err
}
if urlParameter.UseTableIDAsPath != nil {
c.UseTableIDAsPath = *urlParameter.UseTableIDAsPath
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if urlParameter.UseTableIDAsPath != nil {
c.UseTableIDAsPath = *urlParameter.UseTableIDAsPath
}
c.UseTableIDAsPath = util.GetOrZero(urlParameter.UseTableIDAsPath)

Use the util.GetOrZero function to keep the code style

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimized.


c.DateSeparator = util.GetOrZero(sinkConfig.DateSeparator)
c.EnablePartitionSeparator = util.GetOrZero(sinkConfig.EnablePartitionSeparator)
Expand Down Expand Up @@ -168,6 +173,9 @@ func mergeConfig(
dest.FlushInterval = sinkConfig.CloudStorageConfig.FlushInterval
dest.FileSize = sinkConfig.CloudStorageConfig.FileSize
}
if sinkConfig != nil && sinkConfig.UseTableIDAsPath != nil {
dest.UseTableIDAsPath = sinkConfig.UseTableIDAsPath
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err)
}
Expand Down
Loading