Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,19 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputOldValue: c.Sink.OpenProtocolConfig.OutputOldValue,
}
}
var outboxConfig *config.OutboxConfig
if c.Sink.OutboxConfig != nil {
headerColumns := make(map[string]string, len(c.Sink.OutboxConfig.HeaderColumns))
for header, column := range c.Sink.OutboxConfig.HeaderColumns {
headerColumns[header] = column
}
outboxConfig = &config.OutboxConfig{
IDColumn: c.Sink.OutboxConfig.IDColumn,
KeyColumn: c.Sink.OutboxConfig.KeyColumn,
ValueColumn: c.Sink.OutboxConfig.ValueColumn,
HeaderColumns: headerColumns,
}
}

res.Sink = &config.SinkConfig{
DispatchRules: dispatchRules,
Expand All @@ -541,6 +554,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SafeMode: c.Sink.SafeMode,
OpenProtocol: openProtocolConfig,
Debezium: debeziumConfig,
Outbox: outboxConfig,
}

if c.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -875,6 +889,19 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputOldValue: cloned.Sink.OpenProtocol.OutputOldValue,
}
}
var outboxConfig *OutboxConfig
if cloned.Sink.Outbox != nil {
headerColumns := make(map[string]string, len(cloned.Sink.Outbox.HeaderColumns))
for header, column := range cloned.Sink.Outbox.HeaderColumns {
headerColumns[header] = column
}
outboxConfig = &OutboxConfig{
IDColumn: cloned.Sink.Outbox.IDColumn,
KeyColumn: cloned.Sink.Outbox.KeyColumn,
ValueColumn: cloned.Sink.Outbox.ValueColumn,
HeaderColumns: headerColumns,
}
}
res.Sink = &SinkConfig{
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
Expand All @@ -896,6 +923,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SafeMode: cloned.Sink.SafeMode,
DebeziumConfig: debeziumConfig,
OpenProtocolConfig: openProtocolConfig,
OutboxConfig: outboxConfig,
}

if cloned.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -1156,6 +1184,7 @@ type SinkConfig struct {
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
OutboxConfig *OutboxConfig `json:"outbox,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down Expand Up @@ -1529,6 +1558,15 @@ type DebeziumConfig struct {
OutputOldValue bool `json:"output_old_value"`
}

// OutboxConfig represents configurations for outbox-json protocol encoding.
type OutboxConfig struct {
IDColumn string `json:"id_column"`
KeyColumn string `json:"key_column"`
ValueColumn string `json:"value_column"`
// HeaderColumns maps output header key to source column name.
HeaderColumns map[string]string `json:"header_columns,omitempty"`
}

type DispatcherCount struct {
Count int `json:"count"`
}
Expand Down
33 changes: 33 additions & 0 deletions api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,36 @@ func TestReplicaConfigConversion(t *testing.T) {
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
require.Equal(t, "eventual", *apiCfgBack.Consistent.Level)
}

func TestReplicaConfigConversionOutboxConfig(t *testing.T) {
t.Parallel()

protocol := "outbox-json"
apiCfg := &ReplicaConfig{
Sink: &SinkConfig{
Protocol: &protocol,
OutboxConfig: &OutboxConfig{
IDColumn: "id",
KeyColumn: "aggregate_id",
ValueColumn: "payload",
HeaderColumns: map[string]string{"event_type": "event_type"},
},
},
}

internalCfg := apiCfg.ToInternalReplicaConfig()
require.Equal(t, "outbox-json", util.GetOrZero(internalCfg.Sink.Protocol))
require.NotNil(t, internalCfg.Sink.Outbox)
require.Equal(t, "id", internalCfg.Sink.Outbox.IDColumn)
require.Equal(t, "aggregate_id", internalCfg.Sink.Outbox.KeyColumn)
require.Equal(t, "payload", internalCfg.Sink.Outbox.ValueColumn)
require.Equal(t, map[string]string{"event_type": "event_type"}, internalCfg.Sink.Outbox.HeaderColumns)

backToAPI := ToAPIReplicaConfig(internalCfg)
require.NotNil(t, backToAPI.Sink)
require.NotNil(t, backToAPI.Sink.OutboxConfig)
require.Equal(t, "id", backToAPI.Sink.OutboxConfig.IDColumn)
require.Equal(t, "aggregate_id", backToAPI.Sink.OutboxConfig.KeyColumn)
require.Equal(t, "payload", backToAPI.Sink.OutboxConfig.ValueColumn)
require.Equal(t, map[string]string{"event_type": "event_type"}, backToAPI.Sink.OutboxConfig.HeaderColumns)
}
29 changes: 27 additions & 2 deletions downstreamadapter/sink/columnselector/column_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package columnselector

import (
"strings"

"github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter"
"github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter/partition"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -71,7 +73,8 @@ func (s *ColumnSelector) Select(colInfo *model.ColumnInfo) bool {
// ColumnSelectors manages an array of selectors, the first selector match the given
// event is used to select out columns.
type ColumnSelectors struct {
selectors []*ColumnSelector
selectors []*ColumnSelector
outboxRequiredColumns []string
}

// New return a column selectors
Expand All @@ -86,7 +89,8 @@ func New(sinkConfig *config.SinkConfig) (*ColumnSelectors, error) {
}

return &ColumnSelectors{
selectors: selectors,
selectors: selectors,
outboxRequiredColumns: sinkConfig.OutboxRequiredColumns(),
}, nil
}

Expand Down Expand Up @@ -116,6 +120,15 @@ func (c *ColumnSelectors) VerifyTables(
}

retainedColumns := make(map[string]struct{})
topicColumns := make(map[string]struct{})
for _, col := range eventRouter.GetTopicDispatchColumns(table.TableName.Schema, table.TableName.Table) {
topicColumns[strings.ToLower(col)] = struct{}{}
}
outboxColumns := make(map[string]struct{})
for _, col := range c.outboxRequiredColumns {
outboxColumns[strings.ToLower(col)] = struct{}{}
}

for _, columnInfo := range table.GetColumns() {
columnName := columnInfo.Name.O
if s.columnM.MatchColumn(columnName) {
Expand All @@ -135,6 +148,18 @@ func (c *ColumnSelectors) VerifyTables(
}
default:
}

lowerColumnName := strings.ToLower(columnName)
if _, ok := topicColumns[lowerColumnName]; ok {
return errors.ErrColumnSelectorFailed.GenWithStack(
"the filtered out column is used in the topic dispatcher, table: %v, column: %s",
table.TableName, columnInfo.Name)
}
if _, ok := outboxColumns[lowerColumnName]; ok {
return errors.ErrColumnSelectorFailed.GenWithStack(
"the filtered out column is required by outbox config, table: %v, column: %s",
table.TableName, columnInfo.Name)
}
}

if !verifyIndices(table, retainedColumns) {
Expand Down
129 changes: 129 additions & 0 deletions downstreamadapter/sink/columnselector/column_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package columnselector
import (
"testing"

"github.com/pingcap/ticdc/downstreamadapter/sink/eventrouter"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -182,3 +185,129 @@ func TestColumnSelectorGetSelector(t *testing.T) {
}
}
}

func TestVerifyTablesRejectFilteredTopicDispatchColumn(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("create database column_selector_topic")
helper.Tk().MustExec("use column_selector_topic")
helper.DDL2Job("create table t (id int primary key, topic_key varchar(64), payload varchar(64))")

dmlEvent := helper.DML2Event("column_selector_topic", "t",
"insert into t values (1, 'topic-a', 'payload')")
tableInfo := dmlEvent.TableInfo

sinkConfig := &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"column_selector_topic.*"},
TopicRule: "topic_{column:topic_key}",
},
},
ColumnSelectors: []*config.ColumnSelector{
{
Matcher: []string{"column_selector_topic.*"},
Columns: []string{"id", "payload"},
},
},
}

selectors, err := New(sinkConfig)
require.NoError(t, err)
router, err := eventrouter.NewEventRouter(sinkConfig, "default_topic", false, false)
require.NoError(t, err)

err = selectors.VerifyTables([]*common.TableInfo{tableInfo}, router)
require.ErrorContains(t, err, "used in the topic dispatcher")
}

func TestVerifyTablesRejectFilteredOutboxColumn(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("create database column_selector_outbox")
helper.Tk().MustExec("use column_selector_outbox")
helper.DDL2Job("create table t (" +
"id int primary key, " +
"aggregate_id varchar(64), " +
"payload varchar(64), " +
"event_type varchar(64))")

dmlEvent := helper.DML2Event("column_selector_outbox", "t",
"insert into t values (1, 'agg-1', 'payload', 'created')")
tableInfo := dmlEvent.TableInfo

protocol := "outbox-json"
sinkConfig := &config.SinkConfig{
Protocol: &protocol,
Outbox: &config.OutboxConfig{
IDColumn: "id",
KeyColumn: "aggregate_id",
ValueColumn: "payload",
HeaderColumns: map[string]string{"event_type": "event_type"},
},
ColumnSelectors: []*config.ColumnSelector{
{
Matcher: []string{"column_selector_outbox.*"},
Columns: []string{"id", "aggregate_id"},
},
},
}

selectors, err := New(sinkConfig)
require.NoError(t, err)
router, err := eventrouter.NewEventRouter(sinkConfig, "default_topic", false, false)
require.NoError(t, err)

err = selectors.VerifyTables([]*common.TableInfo{tableInfo}, router)
require.ErrorContains(t, err, "required by outbox config")
}

func TestVerifyTablesAllowsRetainedTopicAndOutboxColumns(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("create database column_selector_allow")
helper.Tk().MustExec("use column_selector_allow")
helper.DDL2Job("create table t (" +
"id int primary key, " +
"aggregate_id varchar(64), " +
"payload varchar(64), " +
"event_type varchar(64), " +
"topic_key varchar(64))")

dmlEvent := helper.DML2Event("column_selector_allow", "t",
"insert into t values (1, 'agg-1', 'payload', 'created', 'route-a')")
tableInfo := dmlEvent.TableInfo

protocol := "outbox-json"
sinkConfig := &config.SinkConfig{
Protocol: &protocol,
Outbox: &config.OutboxConfig{
IDColumn: "id",
KeyColumn: "aggregate_id",
ValueColumn: "payload",
HeaderColumns: map[string]string{"event_type": "event_type"},
},
DispatchRules: []*config.DispatchRule{
{
Matcher: []string{"column_selector_allow.*"},
TopicRule: "topic_{column:topic_key}",
},
},
ColumnSelectors: []*config.ColumnSelector{
{
Matcher: []string{"column_selector_allow.*"},
Columns: []string{"id", "aggregate_id", "payload", "event_type", "topic_key"},
},
},
}

selectors, err := New(sinkConfig)
require.NoError(t, err)
router, err := eventrouter.NewEventRouter(sinkConfig, "default_topic", false, false)
require.NoError(t, err)

require.NoError(t, selectors.VerifyTables([]*common.TableInfo{tableInfo}, router))
}
Loading