Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SafeMode: c.Sink.SafeMode,
OpenProtocol: openProtocolConfig,
Debezium: debeziumConfig,
UseTableIDAsPath: c.Sink.UseTableIDAsPath,
}

if c.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -896,6 +897,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SafeMode: cloned.Sink.SafeMode,
DebeziumConfig: debeziumConfig,
OpenProtocolConfig: openProtocolConfig,
UseTableIDAsPath: cloned.Sink.UseTableIDAsPath,
}

if cloned.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -1156,6 +1158,7 @@ type SinkConfig struct {
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
5 changes: 5 additions & 0 deletions api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestReplicaConfigConversion(t *testing.T) {
EnableSyncPoint: util.AddressOf(true),
EnableTableMonitor: util.AddressOf(true),
BDRMode: util.AddressOf(true),
Sink: &SinkConfig{
UseTableIDAsPath: util.AddressOf(true),
},
Mounter: &MounterConfig{
WorkerNum: util.AddressOf(16),
},
Expand Down Expand Up @@ -61,6 +64,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, util.GetOrZero(internalCfg.EnableSyncPoint))
require.True(t, util.GetOrZero(internalCfg.EnableTableMonitor))
require.True(t, util.GetOrZero(internalCfg.BDRMode))
require.True(t, util.GetOrZero(internalCfg.Sink.UseTableIDAsPath))
require.Equal(t, internalCfg.Mounter.WorkerNum, *apiCfg.Mounter.WorkerNum)
require.True(t, util.GetOrZero(internalCfg.Scheduler.EnableTableAcrossNodes))
require.Equal(t, 1000, util.GetOrZero(internalCfg.Scheduler.RegionThreshold))
Expand All @@ -85,6 +89,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, *apiCfgBack.CaseSensitive)
require.True(t, *apiCfgBack.ForceReplicate)
require.True(t, *apiCfgBack.IgnoreIneligibleTable)
require.True(t, *apiCfgBack.Sink.UseTableIDAsPath)
require.Equal(t, 16, *apiCfgBack.Mounter.WorkerNum)
require.True(t, *apiCfgBack.Scheduler.EnableTableAcrossNodes)
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
Expand Down
25 changes: 19 additions & 6 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,37 +194,50 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
// For exchange partition, we need to write the schema of the source table.
// write the previous table first
if event.GetDDLType() == model.ActionExchangeTablePartition {
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid exchange partition ddl event, source table info is missing")
}
sourceTableInfo := event.MultipleTableInfos[1]

var def cloudstorage.TableDefinition
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
def.Query = event.Query
def.Type = event.Type
if err := s.writeFile(event, def); err != nil {
if err := s.writeFile(event, def, event.GetTableID()); err != nil {
return err
}
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef); err != nil {
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef, sourceTableInfo.TableName.TableID); err != nil {
return err
}
} else {
for _, e := range event.GetEvents() {
var def cloudstorage.TableDefinition
def.FromDDLEvent(e, s.cfg.OutputColumnID)
if err := s.writeFile(e, def); err != nil {
if err := s.writeFile(e, def, e.GetTableID()); err != nil {
return err
}
}
}
return nil
}

func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition, tableID int64) error {
// skip write database-level event for 'use-table-id-as-path' mode
if s.cfg.UseTableIDAsPath && def.Table == "" {
log.Debug("skip database schema for table id path",
zap.String("schema", def.Schema),
zap.String("query", def.Query))
return nil
}
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, tableID)
if err != nil {
return errors.Trace(err)
}
Expand Down
155 changes: 155 additions & 0 deletions downstreamadapter/sink/cloudstorage/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,161 @@ func TestWriteDDLEvent(t *testing.T) {
}`, string(tableSchema))
}

func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: ast.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
})
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 add col2 varchar(64)",
Type: byte(timodel.ActionAddColumn),
SchemaName: "test",
TableName: "table1",
FinishedTs: 100,
TableInfo: tableInfo,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

tableDir := path.Join(parentDir, "20/meta/")
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.NoError(t, err)
require.Contains(t, string(tableSchema), `"Table": "table1"`)
}

func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

ddlEvent := &commonEvent.DDLEvent{
Query: "create database test_db",
Type: byte(timodel.ActionCreateSchema),
SchemaName: "test_db",
TableName: "",
FinishedTs: 100,
TableInfo: nil,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

_, err = os.Stat(path.Join(parentDir, "test_db"))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}

func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
testCases := []struct {
name string
multipleTableInfos []*common.TableInfo
}{
{
name: "nil source table info",
multipleTableInfos: []*common.TableInfo{nil},
},
{
name: "short table infos",
multipleTableInfos: nil,
},
}

parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
},
})

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 exchange partition p0 with table test.table2",
Type: byte(timodel.ActionExchangeTablePartition),
SchemaName: "test",
TableName: "table1",
ExtraSchemaName: "test",
ExtraTableName: "table2",
FinishedTs: 100,
TableInfo: tableInfo,
}
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
})
}
}

func TestWriteCheckpointEvent(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
Expand Down
10 changes: 9 additions & 1 deletion downstreamadapter/sink/cloudstorage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,15 @@ func (d *writer) flushMessages(ctx context.Context) error {
zap.Error(err))
return errors.Trace(err)
}
indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date)
indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date)
if err != nil {
log.Error("failed to generate index file path",
zap.Int("workerID", d.id),
zap.String("keyspace", d.changeFeedID.Keyspace()),
zap.Stringer("changefeed", d.changeFeedID.ID()),
zap.Error(err))
return errors.Trace(err)
}

// first write the data file to external storage.
err = d.writeDataFile(ctx, dataFilePath, indexFilePath, task)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func (info *ChangeFeedInfo) rmStorageOnlyFields() {
info.Config.Sink.DateSeparator = nil
info.Config.Sink.EnablePartitionSeparator = nil
info.Config.Sink.FileIndexWidth = nil
info.Config.Sink.UseTableIDAsPath = nil
info.Config.Sink.CloudStorageConfig = nil
}

Expand Down
20 changes: 19 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (

// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// UseTableIDAsPathKey specifies the key of the use-table-id-as-path in the SinkURI.
UseTableIDAsPathKey = "use-table-id-as-path"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity
// unknownTxnAtomicity is an invalid atomicity level and will be treated as
Expand Down Expand Up @@ -154,6 +156,8 @@ type SinkConfig struct {
EnablePartitionSeparator *bool `toml:"enable-partition-separator" json:"enable-partition-separator,omitempty"`
// FileIndexWidth is only available when the downstream is Storage
FileIndexWidth *int `toml:"file-index-digit,omitempty" json:"file-index-digit,omitempty"`
// UseTableIDAsPath is only available when the downstream is Storage.
UseTableIDAsPath *bool `toml:"use-table-id-as-path" json:"use-table-id-as-path,omitempty"`

EnableKafkaSinkV2 *bool `toml:"enable-kafka-sink-v2" json:"enable-kafka-sink-v2,omitempty"`

Expand Down Expand Up @@ -947,6 +951,19 @@ func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error {
s.Protocol = util.AddressOf(protocolFromURI)
}

useTableIDAsPathFromURI := params.Get(UseTableIDAsPathKey)
if useTableIDAsPathFromURI != "" {
enabled, err := strconv.ParseBool(useTableIDAsPathFromURI)
if err != nil {
return errors.Trace(err)
}
if s.UseTableIDAsPath != nil && util.GetOrZero(s.UseTableIDAsPath) != enabled {
cfgInSinkURI[UseTableIDAsPathKey] = strconv.FormatBool(enabled)
cfgInFile[UseTableIDAsPathKey] = strconv.FormatBool(util.GetOrZero(s.UseTableIDAsPath))
}
s.UseTableIDAsPath = util.AddressOf(enabled)
}

getError := func() error {
if len(cfgInSinkURI) != len(cfgInFile) {
log.Panic("inconsistent configuration items in sink uri and configuration file",
Expand Down Expand Up @@ -978,7 +995,8 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI(
}

cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
s.TxnAtomicity != oldSinkConfig.TxnAtomicity
s.TxnAtomicity != oldSinkConfig.TxnAtomicity ||
s.UseTableIDAsPath != oldSinkConfig.UseTableIDAsPath

isURIParamsChanged := func(oldCfg SinkConfig) bool {
err := oldCfg.applyParameterBySinkURI(sinkURI)
Expand Down
12 changes: 9 additions & 3 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ const (
)

type urlConfig struct {
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
UseTableIDAsPath *bool `form:"use-table-id-as-path"`
}

// Config is the configuration for cloud storage sink.
Expand All @@ -82,6 +83,7 @@ type Config struct {
OutputColumnID bool
FlushConcurrency int
EnableTableAcrossNodes bool
UseTableIDAsPath bool
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -133,6 +135,7 @@ func (c *Config) Apply(
return err
}

c.UseTableIDAsPath = util.GetOrZero(urlParameter.UseTableIDAsPath)
c.DateSeparator = util.GetOrZero(sinkConfig.DateSeparator)
c.EnablePartitionSeparator = util.GetOrZero(sinkConfig.EnablePartitionSeparator)
c.FileIndexWidth = util.GetOrZero(sinkConfig.FileIndexWidth)
Expand Down Expand Up @@ -168,6 +171,9 @@ func mergeConfig(
dest.FlushInterval = sinkConfig.CloudStorageConfig.FlushInterval
dest.FileSize = sinkConfig.CloudStorageConfig.FileSize
}
if sinkConfig != nil && sinkConfig.UseTableIDAsPath != nil {
dest.UseTableIDAsPath = sinkConfig.UseTableIDAsPath
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err)
}
Expand Down
Loading