Skip to content

Commit 74c09c3

Browse files
committed
sink(cloudstorage): add use-table-id-as-path option
1 parent 039417c commit 74c09c3

File tree

8 files changed

+199
-18
lines changed

8 files changed

+199
-18
lines changed

downstreamadapter/sink/cloudstorage/sink.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,37 +194,43 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
194194
// For exchange partition, we need to write the schema of the source table.
195195
// write the previous table first
196196
if event.GetDDLType() == model.ActionExchangeTablePartition {
197+
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
198+
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
199+
"invalid exchange partition ddl event, source table info is missing")
200+
}
201+
sourceTableInfo := event.MultipleTableInfos[1]
202+
197203
var def cloudstorage.TableDefinition
198204
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
199205
def.Query = event.Query
200206
def.Type = event.Type
201-
if err := s.writeFile(event, def); err != nil {
207+
if err := s.writeFile(event, def, event.GetTableID()); err != nil {
202208
return err
203209
}
204210
var sourceTableDef cloudstorage.TableDefinition
205-
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
206-
if err := s.writeFile(event, sourceTableDef); err != nil {
211+
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
212+
if err := s.writeFile(event, sourceTableDef, sourceTableInfo.TableName.TableID); err != nil {
207213
return err
208214
}
209215
} else {
210216
for _, e := range event.GetEvents() {
211217
var def cloudstorage.TableDefinition
212218
def.FromDDLEvent(e, s.cfg.OutputColumnID)
213-
if err := s.writeFile(e, def); err != nil {
219+
if err := s.writeFile(e, def, e.GetTableID()); err != nil {
214220
return err
215221
}
216222
}
217223
}
218224
return nil
219225
}
220226

221-
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
227+
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition, tableID int64) error {
222228
encodedDef, err := def.MarshalWithQuery()
223229
if err != nil {
224230
return errors.Trace(err)
225231
}
226232

227-
path, err := def.GenerateSchemaFilePath()
233+
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, tableID)
228234
if err != nil {
229235
return errors.Trace(err)
230236
}

downstreamadapter/sink/cloudstorage/sink_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,119 @@ func TestWriteDDLEvent(t *testing.T) {
207207
}`, string(tableSchema))
208208
}
209209

210+
func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
211+
parentDir := t.TempDir()
212+
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
213+
sinkURI, err := url.Parse(uri)
214+
require.NoError(t, err)
215+
216+
replicaConfig := config.GetDefaultReplicaConfig()
217+
err = replicaConfig.ValidateAndAdjust(sinkURI)
218+
require.NoError(t, err)
219+
220+
ctx, cancel := context.WithCancel(context.Background())
221+
defer cancel()
222+
223+
mockPDClock := pdutil.NewClock4Test()
224+
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
225+
226+
cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
227+
require.NoError(t, err)
228+
229+
go cloudStorageSink.Run(ctx)
230+
231+
tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
232+
ID: 20,
233+
Name: ast.NewCIStr("table1"),
234+
Columns: []*timodel.ColumnInfo{
235+
{
236+
Name: ast.NewCIStr("col1"),
237+
FieldType: *types.NewFieldType(mysql.TypeLong),
238+
},
239+
{
240+
Name: ast.NewCIStr("col2"),
241+
FieldType: *types.NewFieldType(mysql.TypeVarchar),
242+
},
243+
},
244+
})
245+
ddlEvent := &commonEvent.DDLEvent{
246+
Query: "alter table test.table1 add col2 varchar(64)",
247+
Type: byte(timodel.ActionAddColumn),
248+
SchemaName: "test",
249+
TableName: "table1",
250+
FinishedTs: 100,
251+
TableInfo: tableInfo,
252+
}
253+
254+
err = cloudStorageSink.WriteBlockEvent(ddlEvent)
255+
require.NoError(t, err)
256+
257+
tableDir := path.Join(parentDir, "test/20/meta/")
258+
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
259+
require.NoError(t, err)
260+
require.Contains(t, string(tableSchema), `"Table": "table1"`)
261+
}
262+
263+
func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
264+
testCases := []struct {
265+
name string
266+
multipleTableInfos []*common.TableInfo
267+
}{
268+
{
269+
name: "nil source table info",
270+
multipleTableInfos: []*common.TableInfo{nil},
271+
},
272+
{
273+
name: "short table infos",
274+
multipleTableInfos: nil,
275+
},
276+
}
277+
278+
for _, tc := range testCases {
279+
t.Run(tc.name, func(t *testing.T) {
280+
parentDir := t.TempDir()
281+
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
282+
sinkURI, err := url.Parse(uri)
283+
require.NoError(t, err)
284+
285+
replicaConfig := config.GetDefaultReplicaConfig()
286+
err = replicaConfig.ValidateAndAdjust(sinkURI)
287+
require.NoError(t, err)
288+
289+
ctx, cancel := context.WithCancel(context.Background())
290+
defer cancel()
291+
292+
cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
293+
require.NoError(t, err)
294+
295+
tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
296+
ID: 20,
297+
Name: ast.NewCIStr("table1"),
298+
Columns: []*timodel.ColumnInfo{
299+
{
300+
Name: ast.NewCIStr("col1"),
301+
FieldType: *types.NewFieldType(mysql.TypeLong),
302+
},
303+
},
304+
})
305+
ddlEvent := &commonEvent.DDLEvent{
306+
Query: "alter table test.table1 exchange partition p0 with table test.table2",
307+
Type: byte(timodel.ActionExchangeTablePartition),
308+
SchemaName: "test",
309+
TableName: "table1",
310+
ExtraSchemaName: "test",
311+
ExtraTableName: "table2",
312+
FinishedTs: 100,
313+
TableInfo: tableInfo,
314+
}
315+
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)
316+
317+
err = cloudStorageSink.WriteBlockEvent(ddlEvent)
318+
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
319+
})
320+
}
321+
}
322+
210323
func TestWriteCheckpointEvent(t *testing.T) {
211324
parentDir := t.TempDir()
212325
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)

pkg/sink/cloudstorage/config.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ const (
6464
)
6565

6666
type urlConfig struct {
67-
WorkerCount *int `form:"worker-count"`
68-
FlushInterval *string `form:"flush-interval"`
69-
FileSize *int `form:"file-size"`
67+
WorkerCount *int `form:"worker-count"`
68+
FlushInterval *string `form:"flush-interval"`
69+
FileSize *int `form:"file-size"`
70+
UseTableIDAsPath *bool `form:"use-table-id-as-path"`
7071
}
7172

7273
// Config is the configuration for cloud storage sink.
@@ -82,6 +83,7 @@ type Config struct {
8283
OutputColumnID bool
8384
FlushConcurrency int
8485
EnableTableAcrossNodes bool
86+
UseTableIDAsPath bool
8587
}
8688

8789
// NewConfig returns the default cloud storage sink config.
@@ -132,6 +134,9 @@ func (c *Config) Apply(
132134
if err != nil {
133135
return err
134136
}
137+
if urlParameter.UseTableIDAsPath != nil {
138+
c.UseTableIDAsPath = *urlParameter.UseTableIDAsPath
139+
}
135140

136141
c.DateSeparator = util.GetOrZero(sinkConfig.DateSeparator)
137142
c.EnablePartitionSeparator = util.GetOrZero(sinkConfig.EnablePartitionSeparator)

pkg/sink/cloudstorage/config_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ func TestConfigApply(t *testing.T) {
3333
expected.DateSeparator = config.DateSeparatorDay.String()
3434
expected.EnablePartitionSeparator = true
3535
expected.FlushConcurrency = 1
36-
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
36+
expected.UseTableIDAsPath = true
37+
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv&use-table-id-as-path=true"
3738
sinkURI, err := url.Parse(uri)
3839
require.Nil(t, err)
3940

@@ -77,6 +78,11 @@ func TestVerifySinkURIParams(t *testing.T) {
7778
uri: "s3://bucket/prefix?worker-count=64&flush-interval=1m30s&file-size=33554432",
7879
expectedErr: "",
7980
},
81+
{
82+
name: "sink uri with use-table-id-as-path",
83+
uri: "s3://bucket/prefix?use-table-id-as-path=true",
84+
expectedErr: "",
85+
},
8086
{
8187
name: "invalid sink uri with unknown storage scheme",
8288
uri: "xxx://tmp/test",

pkg/sink/cloudstorage/path.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ func generateSchemaFilePath(
110110
return path.Join(dir, name)
111111
}
112112

113+
func generateTablePath(tableName string, tableID int64, useTableIDAsPath bool) string {
114+
if useTableIDAsPath && tableName != "" {
115+
return fmt.Sprintf("%d", tableID)
116+
}
117+
return tableName
118+
}
119+
113120
func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string {
114121
indexFmt := "%0" + strconv.Itoa(fileIndexWidth) + "d"
115122
if enableTableAcrossNodes {
@@ -194,7 +201,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
194201
}
195202

196203
// Case 1: point check if the schema file exists.
197-
tblSchemaFile, err := def.GenerateSchemaFilePath()
204+
tblSchemaFile, err := def.GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID)
198205
if err != nil {
199206
return false, err
200207
}
@@ -211,7 +218,9 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
211218
_, checksum := mustParseSchemaName(tblSchemaFile)
212219
schemaFileCnt := 0
213220
lastVersion := uint64(0)
214-
subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table)
221+
subDir := fmt.Sprintf(tableSchemaPrefix,
222+
def.Schema,
223+
generateTablePath(def.Table, table.TableNameWithPhysicTableID.TableID, f.config.UseTableIDAsPath))
215224
checksumSuffix := fmt.Sprintf("%010d.json", checksum)
216225
hasNewerSchemaVersion := false
217226
err = f.storage.WalkDir(ctx, &storage.WalkOption{
@@ -366,7 +375,12 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str
366375
var elems []string
367376

368377
elems = append(elems, tbl.TableNameWithPhysicTableID.Schema)
369-
elems = append(elems, tbl.TableNameWithPhysicTableID.Table)
378+
elems = append(elems,
379+
generateTablePath(
380+
tbl.TableNameWithPhysicTableID.Table,
381+
tbl.TableNameWithPhysicTableID.TableID,
382+
f.config.UseTableIDAsPath,
383+
))
370384
elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl]))
371385

372386
if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition {

pkg/sink/cloudstorage/path_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,33 @@ func TestGenerateDataFilePath(t *testing.T) {
138138
require.Equal(t, fmt.Sprintf("test/table1/5/2023-01-01/CDC_%s_000001.json", table.DispatcherID.String()), path)
139139
}
140140

141+
func TestGenerateDataFilePathWithTableIDAsPath(t *testing.T) {
142+
t.Parallel()
143+
144+
ctx, cancel := context.WithCancel(context.TODO())
145+
defer cancel()
146+
147+
table := VersionedTableName{
148+
TableNameWithPhysicTableID: commonType.TableName{
149+
Schema: "test",
150+
Table: "table1",
151+
TableID: 12345,
152+
},
153+
TableInfoVersion: 5,
154+
DispatcherID: commonType.NewDispatcherID(),
155+
}
156+
157+
dir := t.TempDir()
158+
f := testFilePathGenerator(ctx, t, dir)
159+
f.config.UseTableIDAsPath = true
160+
f.versionMap[table] = table.TableInfoVersion
161+
162+
date := f.GenerateDateStr()
163+
path, err := f.GenerateDataFilePath(ctx, table, date)
164+
require.NoError(t, err)
165+
require.Equal(t, fmt.Sprintf("test/12345/5/CDC_%s_000001.json", table.DispatcherID.String()), path)
166+
}
167+
141168
func TestFetchIndexFromFileName(t *testing.T) {
142169
t.Parallel()
143170

pkg/sink/cloudstorage/table_definition.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package cloudstorage
1414

1515
import (
1616
"encoding/json"
17+
"fmt"
1718
"sort"
1819
"strconv"
1920
"strings"
@@ -329,14 +330,19 @@ func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error) {
329330
return hasher.Sum32(), nil
330331
}
331332

332-
// GenerateSchemaFilePath generates the schema file path for TableDefinition.
333-
func (t *TableDefinition) GenerateSchemaFilePath() (string, error) {
333+
// GenerateSchemaFilePath generates the schema file path for TableDefinition
334+
// with optional table id path.
335+
func (t *TableDefinition) GenerateSchemaFilePath(useTableIDAsPath bool, tableID int64) (string, error) {
334336
checksum, err := t.Sum32(nil)
335337
if err != nil {
336338
return "", err
337339
}
338340
if !t.IsTableSchema() && t.Table != "" {
339341
log.Panic("invalid table definition", zap.Any("tableDef", t))
340342
}
341-
return generateSchemaFilePath(t.Schema, t.Table, t.TableVersion, checksum), nil
343+
table := t.Table
344+
if useTableIDAsPath && t.IsTableSchema() {
345+
table = fmt.Sprintf("%d", tableID)
346+
}
347+
return generateSchemaFilePath(t.Schema, table, t.TableVersion, checksum), nil
342348
}

pkg/sink/cloudstorage/table_definition_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,14 +488,18 @@ func TestTableDefinitionGenFilePath(t *testing.T) {
488488
Version: defaultTableDefinitionVersion,
489489
TableVersion: 100,
490490
}
491-
schemaPath, err := schemaDef.GenerateSchemaFilePath()
491+
schemaPath, err := schemaDef.GenerateSchemaFilePath(false, 0)
492492
require.NoError(t, err)
493493
require.Equal(t, "schema1/meta/schema_100_3233644819.json", schemaPath)
494494

495495
def, _ := generateTableDef()
496-
tablePath, err := def.GenerateSchemaFilePath()
496+
tablePath, err := def.GenerateSchemaFilePath(false, 0)
497497
require.NoError(t, err)
498498
require.Equal(t, "schema1/table1/meta/schema_100_3752767265.json", tablePath)
499+
500+
tablePath, err = def.GenerateSchemaFilePath(true, 12345)
501+
require.NoError(t, err)
502+
require.Equal(t, "schema1/12345/meta/schema_100_3752767265.json", tablePath)
499503
}
500504

501505
func TestTableDefinitionSum32(t *testing.T) {

0 commit comments

Comments
 (0)