Skip to content

Commit a1e6a45

Browse files
authored
add redo writer & reader (pingcap#1084)
ref pingcap#1061
1 parent 5aebb9d commit a1e6a45

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+8921
-73
lines changed

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ go 1.23.4
55
toolchain go1.23.7
66

77
require (
8+
cloud.google.com/go/storage v1.39.1
9+
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
810
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c
911
github.com/DATA-DOG/go-sqlmock v1.5.0
1012
github.com/IBM/sarama v1.41.2
@@ -64,13 +66,17 @@ require (
6466
go.etcd.io/etcd/api/v3 v3.5.12
6567
go.etcd.io/etcd/client/pkg/v3 v3.5.12
6668
go.etcd.io/etcd/client/v3 v3.5.12
69+
go.etcd.io/etcd/pkg/v3 v3.5.12
6770
go.etcd.io/etcd/server/v3 v3.5.12
6871
go.uber.org/atomic v1.11.0
6972
go.uber.org/goleak v1.3.0
73+
go.uber.org/mock v0.4.0
74+
go.uber.org/multierr v1.11.0
7075
go.uber.org/zap v1.27.0
7176
golang.org/x/net v0.33.0
7277
golang.org/x/oauth2 v0.24.0
7378
golang.org/x/sync v0.10.0
79+
golang.org/x/sys v0.28.0
7480
golang.org/x/term v0.27.0
7581
golang.org/x/text v0.21.0
7682
golang.org/x/time v0.7.0
@@ -83,11 +89,9 @@ require (
8389
cloud.google.com/go/compute/metadata v0.3.0 // indirect
8490
cloud.google.com/go/iam v1.1.7 // indirect
8591
cloud.google.com/go/kms v1.15.8 // indirect
86-
cloud.google.com/go/storage v1.39.1 // indirect
8792
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
8893
github.com/99designs/keyring v1.2.1 // indirect
8994
github.com/AthenZ/athenz v1.10.39 // indirect
90-
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 // indirect
9195
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
9296
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
9397
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
@@ -311,7 +315,6 @@ require (
311315
github.com/zeebo/xxh3 v1.0.2 // indirect
312316
go.etcd.io/bbolt v1.3.9 // indirect
313317
go.etcd.io/etcd/client/v2 v2.305.12 // indirect
314-
go.etcd.io/etcd/pkg/v3 v3.5.12 // indirect
315318
go.etcd.io/etcd/raft/v3 v3.5.12 // indirect
316319
go.opencensus.io v0.24.0 // indirect
317320
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
@@ -323,13 +326,10 @@ require (
323326
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
324327
go.opentelemetry.io/otel/trace v1.24.0 // indirect
325328
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
326-
go.uber.org/mock v0.4.0 // indirect
327-
go.uber.org/multierr v1.11.0 // indirect
328329
golang.org/x/arch v0.3.0 // indirect
329330
golang.org/x/crypto v0.31.0 // indirect
330331
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
331332
golang.org/x/mod v0.22.0 // indirect
332-
golang.org/x/sys v0.28.0 // indirect
333333
golang.org/x/tools v0.28.0 // indirect
334334
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
335335
google.golang.org/api v0.170.0 // indirect

pkg/common/event/redo.go

Lines changed: 80 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -101,22 +101,14 @@ const (
101101
func (r *DMLEvent) ToRedoLog() *RedoLog {
102102
r.FinishGetRow()
103103
row, valid := r.GetNextRow()
104-
if !valid {
105-
log.Panic("DMLEvent.ToRedoLog must be called with a valid row")
106-
}
107104
r.FinishGetRow()
108105

109106
redoLog := &RedoLog{
110107
RedoRow: RedoDMLEvent{
111108
Row: &DMLEventInRedoLog{
112-
StartTs: r.StartTs,
113-
CommitTs: r.CommitTs,
114-
Table: &common.TableName{
115-
Schema: r.TableInfo.GetSchemaName(),
116-
Table: r.TableInfo.GetTableName(),
117-
TableID: r.PhysicalTableID,
118-
IsPartition: r.TableInfo.IsPartitionTable(),
119-
},
109+
StartTs: r.StartTs,
110+
CommitTs: r.CommitTs,
111+
Table: nil,
120112
Columns: nil,
121113
PreColumns: nil,
122114
IndexColumns: nil,
@@ -127,60 +119,95 @@ func (r *DMLEvent) ToRedoLog() *RedoLog {
127119
Type: RedoLogTypeRow,
128120
}
129121

130-
columnCount := len(r.TableInfo.GetColumns())
131-
columns := make([]*RedoColumn, 0, columnCount)
132-
switch row.RowType {
133-
case RowTypeInsert:
134-
redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount)
135-
case RowTypeDelete:
136-
redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount)
137-
case RowTypeUpdate:
138-
redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount)
139-
redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount)
140-
default:
141-
}
122+
if valid && r.TableInfo != nil {
123+
redoLog.RedoRow.Row.Table = new(common.TableName)
124+
*redoLog.RedoRow.Row.Table = r.TableInfo.TableName
125+
126+
columnCount := len(r.TableInfo.GetColumns())
127+
columns := make([]*RedoColumn, 0, columnCount)
128+
switch row.RowType {
129+
case RowTypeInsert:
130+
redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount)
131+
case RowTypeDelete:
132+
redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount)
133+
case RowTypeUpdate:
134+
redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount)
135+
redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount)
136+
default:
137+
}
142138

143-
for i, column := range r.TableInfo.GetColumns() {
144-
if common.IsColCDCVisible(column) {
145-
columns = append(columns, &RedoColumn{
146-
Name: column.Name.String(),
147-
Type: column.GetType(),
148-
Charset: column.GetCharset(),
149-
Collation: column.GetCollate(),
150-
})
151-
switch row.RowType {
152-
case RowTypeInsert:
153-
v := parseColumnValue(&row.Row, column, i)
154-
redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v)
155-
case RowTypeDelete:
156-
v := parseColumnValue(&row.PreRow, column, i)
157-
redoLog.RedoRow.PreColumns = append(redoLog.RedoRow.PreColumns, v)
158-
case RowTypeUpdate:
159-
v := parseColumnValue(&row.Row, column, i)
160-
redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v)
161-
v = parseColumnValue(&row.PreRow, column, i)
162-
redoLog.RedoRow.PreColumns = append(redoLog.RedoRow.PreColumns, v)
163-
default:
139+
for i, column := range r.TableInfo.GetColumns() {
140+
if common.IsColCDCVisible(column) {
141+
columns = append(columns, &RedoColumn{
142+
Name: column.Name.String(),
143+
Type: column.GetType(),
144+
Charset: column.GetCharset(),
145+
Collation: column.GetCollate(),
146+
})
147+
switch row.RowType {
148+
case RowTypeInsert:
149+
v := parseColumnValue(&row.Row, column, i)
150+
redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v)
151+
case RowTypeDelete:
152+
v := parseColumnValue(&row.PreRow, column, i)
153+
redoLog.RedoRow.PreColumns = append(redoLog.RedoRow.PreColumns, v)
154+
case RowTypeUpdate:
155+
v := parseColumnValue(&row.Row, column, i)
156+
redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v)
157+
v = parseColumnValue(&row.PreRow, column, i)
158+
redoLog.RedoRow.PreColumns = append(redoLog.RedoRow.PreColumns, v)
159+
default:
160+
}
164161
}
165162
}
163+
redoLog.RedoRow.Row.Columns = columns
164+
redoLog.RedoRow.Row.PreColumns = columns
166165
}
167-
redoLog.RedoRow.Row.Columns = columns
168-
redoLog.RedoRow.Row.PreColumns = columns
169166

170167
return redoLog
171168
}
172169

173170
// ToRedoLog converts ddl event to redo log
174171
func (d *DDLEvent) ToRedoLog() *RedoLog {
175-
ddlInRedoLog := &DDLEventInRedoLog{
176-
StartTs: d.GetStartTs(),
177-
CommitTs: d.GetCommitTs(),
178-
Query: d.Query,
172+
redoLog := &RedoLog{
173+
RedoDDL: RedoDDLEvent{
174+
DDL: &DDLEventInRedoLog{
175+
StartTs: d.GetStartTs(),
176+
CommitTs: d.GetCommitTs(),
177+
Query: d.Query,
178+
},
179+
Type: d.Type,
180+
},
181+
Type: RedoLogTypeDDL,
179182
}
180-
return &RedoLog{
181-
RedoDDL: RedoDDLEvent{DDL: ddlInRedoLog},
182-
Type: RedoLogTypeDDL,
183+
if d.TableInfo != nil {
184+
redoLog.RedoDDL.TableName = d.TableInfo.TableName
183185
}
186+
187+
return redoLog
188+
}
189+
190+
// GetCommitTs returns commit timestamp of the log event.
191+
func (r *RedoLog) GetCommitTs() common.Ts {
192+
switch r.Type {
193+
case RedoLogTypeRow:
194+
return r.RedoRow.Row.CommitTs
195+
case RedoLogTypeDDL:
196+
return r.RedoDDL.DDL.CommitTs
197+
default:
198+
log.Panic("Unexpected redo log type")
199+
return 0
200+
}
201+
}
202+
203+
// IsDelete checks whether it's a deletion or not.
204+
func (r RedoDMLEvent) IsDelete() bool {
205+
return len(r.Row.PreColumns) > 0 && len(r.Row.Columns) == 0
206+
}
207+
208+
// IsUpdate checks whether it's a update or not.
209+
func (r RedoDMLEvent) IsUpdate() bool {
210+
return len(r.Row.PreColumns) > 0 && len(r.Row.Columns) > 0
184211
}
185212

186213
func parseColumnValue(row *chunk.Row, column *model.ColumnInfo, i int) RedoColumnValue {

pkg/common/event/redo_gen_compatibility_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import (
1818

1919
"github.com/pingcap/ticdc/pkg/common"
2020
"github.com/pingcap/tidb/pkg/meta/model"
21+
"github.com/pingcap/tidb/pkg/types"
2122
parserModel "github.com/pingcap/tidb/pkg/parser/model"
2223
"github.com/pingcap/tidb/pkg/parser/mysql"
23-
"github.com/pingcap/tidb/pkg/types"
2424
"github.com/pingcap/tidb/pkg/util/chunk"
2525
oldArchModel "github.com/pingcap/tiflow/cdc/model"
2626
"github.com/stretchr/testify/require"
@@ -30,26 +30,26 @@ func TestDMLCompatibility(t *testing.T) {
3030
tableInfo := &model.TableInfo{
3131
Columns: []*model.ColumnInfo{
3232
&model.ColumnInfo{
33-
Name: parserModel.NewCIStr("c1"),
34-
Offset: 0,
33+
Name: parserModel.NewCIStr("c1"),
34+
Offset: 0,
3535
GeneratedExprString: "xxx",
36-
FieldType: *types.NewFieldType(mysql.TypeString),
36+
FieldType: *types.NewFieldType(mysql.TypeString),
3737
},
3838
&model.ColumnInfo{
39-
Name: parserModel.NewCIStr("c2"),
40-
Offset: 1,
39+
Name: parserModel.NewCIStr("c2"),
40+
Offset: 1,
4141
GeneratedExprString: "",
42-
FieldType: *types.NewFieldType(mysql.TypeString),
42+
FieldType: *types.NewFieldType(mysql.TypeString),
4343
},
4444
},
4545
}
4646

47-
rowEvent := &DMLEvent{
48-
StartTs: 100,
49-
CommitTs: 200,
50-
TableInfo: common.WrapTableInfo("test", tableInfo),
47+
rowEvent := &DMLEvent {
48+
StartTs: 100,
49+
CommitTs: 200,
50+
TableInfo: common.WrapTableInfo(1, "test", tableInfo),
5151
Rows: chunk.NewEmptyChunk([]*types.FieldType{
52-
&tableInfo.Columns[0].FieldType,
52+
&tableInfo.Columns[0].FieldType,
5353
&tableInfo.Columns[1].FieldType,
5454
}),
5555
RowTypes: []RowType{RowTypeUpdate},

pkg/common/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ const (
3131
var DefaultEndian = binary.LittleEndian
3232

3333
type (
34-
Ts = uint64
35-
TableID = int64
34+
Ts = uint64
35+
TableID = int64
36+
CaptureID = string
3637
)
3738

3839
type CoordinatorID string

pkg/errors/error.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,4 +486,46 @@ var (
486486
"user %s unauthorized, error: %s",
487487
errors.RFCCodeText("CDC:ErrUnauthorized"),
488488
)
489+
490+
ErrExternalStorageAPI = errors.Normalize(
491+
"external storage api",
492+
errors.RFCCodeText("CDC:ErrS3StorageAPI"),
493+
)
494+
ErrConsistentStorage = errors.Normalize(
495+
"consistent storage (%s) not support",
496+
errors.RFCCodeText("CDC:ErrConsistentStorage"),
497+
)
498+
ErrStorageInitialize = errors.Normalize(
499+
"fail to open storage for redo log",
500+
errors.RFCCodeText("CDC:ErrStorageInitialize"),
501+
)
502+
503+
ErrRedoConfigInvalid = errors.Normalize(
504+
"redo log config invalid",
505+
errors.RFCCodeText("CDC:ErrRedoConfigInvalid"),
506+
)
507+
ErrRedoDownloadFailed = errors.Normalize(
508+
"redo log down load to local failed",
509+
errors.RFCCodeText("CDC:ErrRedoDownloadFailed"),
510+
)
511+
ErrRedoWriterStopped = errors.Normalize(
512+
"redo log writer stopped",
513+
errors.RFCCodeText("CDC:ErrRedoWriterStopped"),
514+
)
515+
ErrRedoFileOp = errors.Normalize(
516+
"redo file operation",
517+
errors.RFCCodeText("CDC:ErrRedoFileOp"),
518+
)
519+
ErrRedoFileSizeExceed = errors.Normalize(
520+
"redo file size %d exceeds maximum %d",
521+
errors.RFCCodeText("CDC:ErrRedoFileSizeExceed"),
522+
)
523+
ErrRedoMetaFileNotFound = errors.Normalize(
524+
"no redo meta file found in dir: %s",
525+
errors.RFCCodeText("CDC:ErrRedoMetaFileNotFound"),
526+
)
527+
ErrRedoMetaInitialize = errors.Normalize(
528+
"initialize meta for redo log",
529+
errors.RFCCodeText("CDC:ErrRedoMetaInitialize"),
530+
)
489531
)

pkg/fsutil/disk_info_freebsd.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
//
14+
//go:build freebsd
15+
16+
package fsutil
17+
18+
import (
19+
"os"
20+
"path/filepath"
21+
"syscall"
22+
23+
cerror "github.com/pingcap/tiflow/pkg/errors"
24+
)
25+
26+
// GetDiskInfo return the disk space information of the given directory
27+
// the caller should guarantee that dir exist
28+
func GetDiskInfo(dir string) (*DiskInfo, error) {
29+
f := filepath.Join(dir, "file.test")
30+
if err := os.WriteFile(f, []byte(""), 0o600); err != nil {
31+
return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err)
32+
}
33+
34+
fs := syscall.Statfs_t{}
35+
if err := syscall.Statfs(dir, &fs); err != nil {
36+
return nil, cerror.WrapError(cerror.ErrGetDiskInfo, err)
37+
}
38+
39+
info := &DiskInfo{
40+
All: fs.Blocks * uint64(fs.Bsize) / gb,
41+
Avail: uint64(fs.Bavail) * uint64(fs.Bsize) / gb,
42+
Free: fs.Bfree * uint64(fs.Bsize) / gb,
43+
}
44+
info.Used = info.All - info.Free
45+
info.AvailPercentage = float32(info.Avail) / float32(info.All) * 100
46+
47+
if err := os.Remove(f); err != nil {
48+
if !os.IsNotExist(err) {
49+
return info, cerror.WrapError(cerror.ErrGetDiskInfo, err)
50+
}
51+
}
52+
53+
return info, nil
54+
}

0 commit comments

Comments
 (0)