Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,37 +194,43 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
// For exchange partition, we need to write the schema of the source table.
// write the previous table first
if event.GetDDLType() == model.ActionExchangeTablePartition {
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid exchange partition ddl event, source table info is missing")
}
sourceTableInfo := event.MultipleTableInfos[1]

var def cloudstorage.TableDefinition
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
def.Query = event.Query
def.Type = event.Type
if err := s.writeFile(event, def); err != nil {
if err := s.writeFile(event, def, event.GetTableID()); err != nil {
return err
}
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef); err != nil {
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef, sourceTableInfo.TableName.TableID); err != nil {
return err
}
} else {
for _, e := range event.GetEvents() {
var def cloudstorage.TableDefinition
def.FromDDLEvent(e, s.cfg.OutputColumnID)
if err := s.writeFile(e, def); err != nil {
if err := s.writeFile(e, def, e.GetTableID()); err != nil {
return err
}
}
}
return nil
}

func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition, tableID int64) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, tableID)
if err != nil {
return errors.Trace(err)
}
Expand Down
117 changes: 117 additions & 0 deletions downstreamadapter/sink/cloudstorage/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,123 @@ func TestWriteDDLEvent(t *testing.T) {
}`, string(tableSchema))
}

func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: ast.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
})
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 add col2 varchar(64)",
Type: byte(timodel.ActionAddColumn),
SchemaName: "test",
TableName: "table1",
FinishedTs: 100,
TableInfo: tableInfo,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

tableDir := path.Join(parentDir, "test/20/meta/")
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.NoError(t, err)
require.Contains(t, string(tableSchema), `"Table": "table1"`)
}

func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
testCases := []struct {
name string
multipleTableInfos []*common.TableInfo
}{
{
name: "nil source table info",
multipleTableInfos: []*common.TableInfo{nil},
},
{
name: "short table infos",
multipleTableInfos: nil,
},
}

parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: ast.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: ast.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
},
})

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 exchange partition p0 with table test.table2",
Type: byte(timodel.ActionExchangeTablePartition),
SchemaName: "test",
TableName: "table1",
ExtraSchemaName: "test",
ExtraTableName: "table2",
FinishedTs: 100,
TableInfo: tableInfo,
}
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
})
}
}

func TestWriteCheckpointEvent(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
Expand Down
11 changes: 8 additions & 3 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ const (
)

type urlConfig struct {
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
WorkerCount *int `form:"worker-count"`
FlushInterval *string `form:"flush-interval"`
FileSize *int `form:"file-size"`
UseTableIDAsPath *bool `form:"use-table-id-as-path"`
}

// Config is the configuration for cloud storage sink.
Expand All @@ -82,6 +83,7 @@ type Config struct {
OutputColumnID bool
FlushConcurrency int
EnableTableAcrossNodes bool
UseTableIDAsPath bool
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -132,6 +134,9 @@ func (c *Config) Apply(
if err != nil {
return err
}
if urlParameter.UseTableIDAsPath != nil {
c.UseTableIDAsPath = *urlParameter.UseTableIDAsPath
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if urlParameter.UseTableIDAsPath != nil {
c.UseTableIDAsPath = *urlParameter.UseTableIDAsPath
}
c.UseTableIDAsPath = util.GetOrZero(urlParameter.UseTableIDAsPath)

Use the util.GetOrZero function to keep the code style

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimized.


c.DateSeparator = util.GetOrZero(sinkConfig.DateSeparator)
c.EnablePartitionSeparator = util.GetOrZero(sinkConfig.EnablePartitionSeparator)
Expand Down
8 changes: 7 additions & 1 deletion pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestConfigApply(t *testing.T) {
expected.DateSeparator = config.DateSeparatorDay.String()
expected.EnablePartitionSeparator = true
expected.FlushConcurrency = 1
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
expected.UseTableIDAsPath = true
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv&use-table-id-as-path=true"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)

Expand Down Expand Up @@ -77,6 +78,11 @@ func TestVerifySinkURIParams(t *testing.T) {
uri: "s3://bucket/prefix?worker-count=64&flush-interval=1m30s&file-size=33554432",
expectedErr: "",
},
{
name: "sink uri with use-table-id-as-path",
uri: "s3://bucket/prefix?use-table-id-as-path=true",
expectedErr: "",
},
{
name: "invalid sink uri with unknown storage scheme",
uri: "xxx://tmp/test",
Expand Down
30 changes: 23 additions & 7 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func mustParseSchemaName(path string) (uint64, uint32) {

func generateSchemaFilePath(
schema, table string, tableVersion uint64, checksum uint32,
) string {
) (string, error) {
if schema == "" || tableVersion == 0 {
log.Panic("invalid schema or tableVersion",
zap.String("schema", schema), zap.String("table", table), zap.Uint64("tableVersion", tableVersion))
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs(
fmt.Sprintf("invalid schema or tableVersion, schema=%q table=%q tableVersion=%d",
schema, table, tableVersion),
)
}

var dir string
Expand All @@ -107,7 +109,14 @@ func generateSchemaFilePath(
dir = fmt.Sprintf(tableSchemaPrefix, schema, table)
}
name := fmt.Sprintf(schemaFileNameFormat, tableVersion, checksum)
return path.Join(dir, name)
return path.Join(dir, name), nil
}

func generateTablePath(tableName string, tableID int64, useTableIDAsPath bool) string {
if useTableIDAsPath && tableName != "" {
return fmt.Sprintf("%d", tableID)
}
return tableName
}

func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string {
Expand Down Expand Up @@ -194,7 +203,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
}

// Case 1: point check if the schema file exists.
tblSchemaFile, err := def.GenerateSchemaFilePath()
tblSchemaFile, err := def.GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID)
if err != nil {
return false, err
}
Expand All @@ -211,7 +220,9 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
_, checksum := mustParseSchemaName(tblSchemaFile)
schemaFileCnt := 0
lastVersion := uint64(0)
subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table)
subDir := fmt.Sprintf(tableSchemaPrefix,
def.Schema,
generateTablePath(def.Table, table.TableNameWithPhysicTableID.TableID, f.config.UseTableIDAsPath))
checksumSuffix := fmt.Sprintf("%010d.json", checksum)
hasNewerSchemaVersion := false
err = f.storage.WalkDir(ctx, &storage.WalkOption{
Expand Down Expand Up @@ -366,7 +377,12 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str
var elems []string

elems = append(elems, tbl.TableNameWithPhysicTableID.Schema)
elems = append(elems, tbl.TableNameWithPhysicTableID.Table)
elems = append(elems,
generateTablePath(
tbl.TableNameWithPhysicTableID.Table,
tbl.TableNameWithPhysicTableID.TableID,
f.config.UseTableIDAsPath,
))
elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl]))

if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition {
Expand Down
27 changes: 27 additions & 0 deletions pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,33 @@ func TestGenerateDataFilePath(t *testing.T) {
require.Equal(t, fmt.Sprintf("test/table1/5/2023-01-01/CDC_%s_000001.json", table.DispatcherID.String()), path)
}

func TestGenerateDataFilePathWithTableIDAsPath(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

table := VersionedTableName{
TableNameWithPhysicTableID: commonType.TableName{
Schema: "test",
Table: "table1",
TableID: 12345,
},
TableInfoVersion: 5,
DispatcherID: commonType.NewDispatcherID(),
}

dir := t.TempDir()
f := testFilePathGenerator(ctx, t, dir)
f.config.UseTableIDAsPath = true
f.versionMap[table] = table.TableInfoVersion

date := f.GenerateDateStr()
path, err := f.GenerateDataFilePath(ctx, table, date)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("test/12345/5/CDC_%s_000001.json", table.DispatcherID.String()), path)
}

func TestFetchIndexFromFileName(t *testing.T) {
t.Parallel()

Expand Down
36 changes: 31 additions & 5 deletions pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,40 @@ func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error) {
return hasher.Sum32(), nil
}

// GenerateSchemaFilePath generates the schema file path for TableDefinition.
func (t *TableDefinition) GenerateSchemaFilePath() (string, error) {
// GenerateSchemaFilePath generates the schema file path for TableDefinition
// with optional table id path.
func (t *TableDefinition) GenerateSchemaFilePath(useTableIDAsPath bool, tableID int64) (string, error) {
if t.Schema == "" {
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("schema cannot be empty")
}
if t.TableVersion == 0 {
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("table version cannot be zero")
}
if len(t.Columns) != t.TotalColumns {
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table definition")
}
isTableSchema := t.TotalColumns != 0

checksum, err := t.Sum32(nil)
if err != nil {
return "", err
}
if !t.IsTableSchema() && t.Table != "" {
log.Panic("invalid table definition", zap.Any("tableDef", t))
if !isTableSchema && t.Table != "" {
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid table definition",
)
}
if useTableIDAsPath && t.Table != "" {
if tableID <= 0 {
return "", errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid table id for table-id path",
)
}
}
table := generateTablePath(t.Table, tableID, useTableIDAsPath)
path, err := generateSchemaFilePath(t.Schema, table, t.TableVersion, checksum)
if err != nil {
return "", err
}
return generateSchemaFilePath(t.Schema, t.Table, t.TableVersion, checksum), nil
return path, nil
}
Loading