Skip to content

Commit 853ffa0

Browse files
committed
use binlogconsumer
1 parent 1ff8804 commit 853ffa0

File tree

7 files changed

+75
-66
lines changed

7 files changed

+75
-66
lines changed

sql/analyzer/catalog.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Catalog struct {
3434
DbProvider sql.DatabaseProvider
3535
AuthHandler sql.AuthorizationHandler
3636

37+
// BinlogConsumer holds an optional consumer that processes binlog events (e.g. for BINLOG statements).
38+
BinlogConsumer binlogreplication.BinlogConsumer
3739
// BinlogReplicaController holds an optional controller that receives forwarded binlog
3840
// replication messages (e.g. "start replica").
3941
BinlogReplicaController binlogreplication.BinlogReplicaController
@@ -53,6 +55,7 @@ func (c *Catalog) DropDbStats(ctx *sql.Context, db string, flush bool) error {
5355
}
5456

5557
var _ sql.Catalog = (*Catalog)(nil)
58+
var _ binlogreplication.BinlogConsumerCatalog = (*Catalog)(nil)
5659
var _ binlogreplication.BinlogReplicaCatalog = (*Catalog)(nil)
5760
var _ binlogreplication.BinlogPrimaryCatalog = (*Catalog)(nil)
5861

@@ -76,6 +79,14 @@ func NewCatalog(provider sql.DatabaseProvider) *Catalog {
7679
return c
7780
}
7881

82+
func (c *Catalog) HasBinlogConsumer() bool {
83+
return c.BinlogConsumer != nil
84+
}
85+
86+
func (c *Catalog) GetBinlogConsumer() binlogreplication.BinlogConsumer {
87+
return c.BinlogConsumer
88+
}
89+
7990
func (c *Catalog) HasBinlogReplicaController() bool {
8091
return c.BinlogReplicaController != nil
8192
}

sql/binlogreplication/binlog_replication.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ import (
2525
"github.com/dolthub/vitess/go/mysql"
2626
)
2727

28+
// BinlogConsumer processes binlog events. This interface can be used by any component that needs to consume
29+
// and apply binlog events, such as BINLOG statement execution, streaming replication, or other binlog processing.
30+
type BinlogConsumer interface {
31+
// ProcessEvent processes a single binlog event.
32+
ProcessEvent(ctx *sql.Context, event mysql.BinlogEvent) error
33+
34+
// HasFormatDescription returns true if a FORMAT_DESCRIPTION_EVENT has been processed.
35+
// This is required before processing TABLE_MAP and row events in BINLOG statements.
36+
HasFormatDescription() bool
37+
}
38+
2839
// BinlogReplicaController allows callers to control a binlog replica. Providers built on go-mysql-server may optionally
2940
// implement this interface and use it when constructing a SQL engine in order to receive callbacks when replication
3041
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled.
@@ -62,12 +73,6 @@ type BinlogReplicaController interface {
6273
// error indicating that replication needs to be stopped before it can be reset. If any errors were encountered
6374
// resetting the replica state, an error is returned, otherwise nil is returned if the reset was successful.
6475
ResetReplica(ctx *sql.Context, resetAll bool) error
65-
66-
// ConsumeBinlogEvent processes a single parsed binlog event.
67-
ConsumeBinlogEvent(ctx *sql.Context, event mysql.BinlogEvent) error
68-
69-
// HasFormatDescription returns true if a FORMAT_DESCRIPTION_EVENT has been processed.
70-
HasFormatDescription() bool
7176
}
7277

7378
// BinlogPrimaryController allows an integrator to extend GMS with support for operating as a binlog primary server.
@@ -153,6 +158,15 @@ type ReplicaStatus struct {
153158
SourceSsl bool
154159
}
155160

161+
// BinlogConsumerCatalog extends the Catalog interface and provides methods for accessing a BinlogConsumer
162+
// for BINLOG statement execution and other binlog event processing.
163+
type BinlogConsumerCatalog interface {
164+
// HasBinlogConsumer returns true if a non-nil BinlogConsumer is available for this catalog.
165+
HasBinlogConsumer() bool
166+
// GetBinlogConsumer returns the BinlogConsumer registered with this catalog.
167+
GetBinlogConsumer() BinlogConsumer
168+
}
169+
156170
// BinlogReplicaCatalog extends the Catalog interface and provides methods for accessing a BinlogReplicaController
157171
// for a Catalog.
158172
type BinlogReplicaCatalog interface {

sql/plan/binlog.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,18 @@ const DynamicPrivilege_BinlogAdmin = "binlog_admin"
2828
// mysqldump, mysqlbinlog, and mariadb-binlog read these binary events from log files and output them as base64-encoded
2929
// BINLOG statements for replay.
3030
//
31-
// The BINLOG statement execution is delegated to the BinlogReplicaController. The base64-encoded event data is decoded
32-
// and passed to the controller's ConsumeBinlogEvents method for processing. This allows integrators like Dolt to handle
33-
// BINLOG statement execution using their existing binlog replication infrastructure.
31+
// The BINLOG statement execution is delegated to the BinlogConsumer. The base64-encoded event data is decoded
32+
// and passed to the consumer's ProcessEvent method for processing. This allows integrators like Dolt to handle
33+
// BINLOG statement execution using their binlog event processing infrastructure.
3434
//
3535
// See https://dev.mysql.com/doc/refman/8.4/en/binlog.html for the BINLOG statement specification.
3636
type Binlog struct {
37-
Base64Str string
38-
ReplicaController binlogreplication.BinlogReplicaController
37+
Base64Str string
38+
Consumer binlogreplication.BinlogConsumer
3939
}
4040

4141
var _ sql.Node = (*Binlog)(nil)
42-
var _ BinlogReplicaControllerCommand = (*Binlog)(nil)
42+
var _ BinlogConsumerCommand = (*Binlog)(nil)
4343

4444
// NewBinlog creates a new Binlog node.
4545
func NewBinlog(base64Str string) *Binlog {
@@ -48,10 +48,10 @@ func NewBinlog(base64Str string) *Binlog {
4848
}
4949
}
5050

51-
// WithBinlogReplicaController implements the BinlogReplicaControllerCommand interface.
52-
func (b *Binlog) WithBinlogReplicaController(controller binlogreplication.BinlogReplicaController) sql.Node {
51+
// WithBinlogConsumer implements the BinlogConsumerCommand interface.
52+
func (b *Binlog) WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node {
5353
nc := *b
54-
nc.ReplicaController = controller
54+
nc.Consumer = consumer
5555
return &nc
5656
}
5757

sql/plan/replication_commands.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ const DynamicPrivilege_ReplicationSlaveAdmin = "replication_slave_admin"
3636
// See https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_replication-applier
3737
const DynamicPrivilege_ReplicationApplier = "replication_applier"
3838

39+
// BinlogConsumerCommand represents a SQL statement that requires a BinlogConsumer
40+
// (e.g. BINLOG statement).
41+
type BinlogConsumerCommand interface {
42+
sql.Node
43+
44+
// WithBinlogConsumer returns a new instance of this command, with the binlog consumer configured.
45+
WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node
46+
}
47+
3948
// BinlogReplicaControllerCommand represents a SQL statement that requires a BinlogReplicaController
4049
// (e.g. Start Replica, Show Replica Status).
4150
type BinlogReplicaControllerCommand interface {

sql/planbuilder/builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,8 @@ func (b *Builder) buildSubquery(inScope *scope, stmt ast.Statement, subQuery str
407407
}
408408
outScope = inScope.push()
409409
binlogNode := plan.NewBinlog(n.Base64Str)
410-
if binCat, ok := b.cat.(binlogreplication.BinlogReplicaCatalog); ok && binCat.HasBinlogReplicaController() {
411-
binlogNode = binlogNode.WithBinlogReplicaController(binCat.GetBinlogReplicaController()).(*plan.Binlog)
410+
if binCat, ok := b.cat.(binlogreplication.BinlogConsumerCatalog); ok && binCat.HasBinlogConsumer() {
411+
binlogNode = binlogNode.WithBinlogConsumer(binCat.GetBinlogConsumer()).(*plan.Binlog)
412412
}
413413
outScope.node = binlogNode
414414
}

sql/rowexec/binlog.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import (
3939
//
4040
// See https://dev.mysql.com/doc/refman/8.4/en/binlog.html for the BINLOG statement specification.
4141
func (b *BaseBuilder) buildBinlog(ctx *sql.Context, n *plan.Binlog, row sql.Row) (sql.RowIter, error) {
42-
if n.ReplicaController == nil {
43-
return nil, fmt.Errorf("BINLOG statement requires BinlogReplicaController")
42+
if n.Consumer == nil {
43+
return nil, fmt.Errorf("BINLOG statement requires BinlogConsumer")
4444
}
4545

4646
var decoded []byte
@@ -60,16 +60,16 @@ func (b *BaseBuilder) buildBinlog(ctx *sql.Context, n *plan.Binlog, row sql.Row)
6060
}
6161

6262
return &binlogIter{
63-
controller: n.ReplicaController,
64-
decoded: decoded,
63+
consumer: n.Consumer,
64+
decoded: decoded,
6565
}, nil
6666
}
6767

6868
// binlogIter processes decoded binlog events one at a time, returning an OkResult when all events are processed.
6969
type binlogIter struct {
70-
controller binlogreplication.BinlogReplicaController
71-
decoded []byte
72-
offset int
70+
consumer binlogreplication.BinlogConsumer
71+
decoded []byte
72+
offset int
7373
}
7474

7575
var _ sql.RowIter = (*binlogIter)(nil)
@@ -119,13 +119,13 @@ func (bi *binlogIter) Next(ctx *sql.Context) (sql.Row, error) {
119119

120120
// Check that TABLE_MAP and row events have a FORMAT_DESCRIPTION first
121121
if event.IsTableMap() || event.IsWriteRows() || event.IsUpdateRows() || event.IsDeleteRows() {
122-
if !bi.controller.HasFormatDescription() {
122+
if !bi.consumer.HasFormatDescription() {
123123
return nil, sql.ErrNoFormatDescriptionEventBeforeBinlogStatement.New(event.TypeName())
124124
}
125125
}
126126

127-
// Process this event using the controller
128-
err := bi.controller.ConsumeBinlogEvent(ctx, event)
127+
// Process this event using the consumer
128+
err := bi.consumer.ProcessEvent(ctx, event)
129129
if err != nil {
130130
return nil, err
131131
}

sql/rowexec/binlog_test.go

Lines changed: 14 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"testing"
2222

2323
"github.com/dolthub/go-mysql-server/sql"
24-
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
2524
"github.com/dolthub/go-mysql-server/sql/plan"
2625
"github.com/dolthub/go-mysql-server/sql/types"
2726
"github.com/dolthub/vitess/go/mysql"
@@ -36,7 +35,7 @@ func TestBuildBinlog_InvalidBase64(t *testing.T) {
3635

3736
_, err := builder.buildBinlog(ctx, binlogNode, nil)
3837
require.Error(t, err)
39-
require.Contains(t, err.Error(), "BinlogReplicaController")
38+
require.Contains(t, err.Error(), "BinlogConsumer")
4039
}
4140

4241
func TestBuildBinlog_NoBinlogReplicaController(t *testing.T) {
@@ -52,57 +51,33 @@ func TestBuildBinlog_NoBinlogReplicaController(t *testing.T) {
5251

5352
_, err := builder.buildBinlog(ctx, binlogNode, nil)
5453
require.Error(t, err)
55-
require.Contains(t, err.Error(), "BinlogReplicaController")
54+
require.Contains(t, err.Error(), "BinlogConsumer")
5655
}
5756

58-
// mockBinlogReplicaController is a test implementation of BinlogReplicaController
59-
type mockBinlogReplicaController struct {
57+
// mockBinlogConsumer is a test implementation of BinlogConsumer
58+
type mockBinlogConsumer struct {
6059
consumedEvents []mysql.BinlogEvent
6160
returnError error
6261
hasFormatDesc bool
6362
}
6463

65-
func (m *mockBinlogReplicaController) ConsumeBinlogEvent(ctx *sql.Context, event mysql.BinlogEvent) error {
64+
func (m *mockBinlogConsumer) ProcessEvent(ctx *sql.Context, event mysql.BinlogEvent) error {
6665
m.consumedEvents = append(m.consumedEvents, event)
6766
if event.IsFormatDescription() {
6867
m.hasFormatDesc = true
6968
}
7069
return m.returnError
7170
}
7271

73-
func (m *mockBinlogReplicaController) HasFormatDescription() bool {
72+
func (m *mockBinlogConsumer) HasFormatDescription() bool {
7473
return m.hasFormatDesc
7574
}
7675

77-
func (m *mockBinlogReplicaController) StartReplica(ctx *sql.Context) error {
78-
return nil
79-
}
80-
81-
func (m *mockBinlogReplicaController) StopReplica(ctx *sql.Context) error {
82-
return nil
83-
}
84-
85-
func (m *mockBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context, options []binlogreplication.ReplicationOption) error {
86-
return nil
87-
}
88-
89-
func (m *mockBinlogReplicaController) SetReplicationFilterOptions(ctx *sql.Context, options []binlogreplication.ReplicationOption) error {
90-
return nil
91-
}
92-
93-
func (m *mockBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogreplication.ReplicaStatus, error) {
94-
return nil, nil
95-
}
96-
97-
func (m *mockBinlogReplicaController) ResetReplica(ctx *sql.Context, resetAll bool) error {
98-
return nil
99-
}
100-
10176
func TestBuildBinlog_WithBinlogReplicaController(t *testing.T) {
10277
builder := &BaseBuilder{}
10378
ctx := sql.NewEmptyContext()
10479

105-
mockController := &mockBinlogReplicaController{}
80+
mockConsumer := &mockBinlogConsumer{}
10681

10782
// Create a minimal valid binlog event (FORMAT_DESCRIPTION_EVENT)
10883
// Event header: timestamp(4) + type(1) + server_id(4) + event_length(4) + next_position(4) + flags(2)
@@ -112,7 +87,7 @@ func TestBuildBinlog_WithBinlogReplicaController(t *testing.T) {
11287

11388
encoded := base64.StdEncoding.EncodeToString(eventData)
11489

115-
binlogNode := plan.NewBinlog(encoded).WithBinlogReplicaController(mockController).(*plan.Binlog)
90+
binlogNode := plan.NewBinlog(encoded).WithBinlogConsumer(mockConsumer).(*plan.Binlog)
11691

11792
iter, err := builder.buildBinlog(ctx, binlogNode, nil)
11893
require.NoError(t, err)
@@ -124,7 +99,7 @@ func TestBuildBinlog_WithBinlogReplicaController(t *testing.T) {
12499
require.Equal(t, types.OkResult{}, row[0])
125100

126101
// Verify controller received one event
127-
require.Len(t, mockController.consumedEvents, 1)
102+
require.Len(t, mockConsumer.consumedEvents, 1)
128103

129104
// Next call should return EOF
130105
_, err = iter.Next(ctx)
@@ -135,7 +110,7 @@ func TestBuildBinlog_MultilineBase64WithController(t *testing.T) {
135110
builder := &BaseBuilder{}
136111
ctx := sql.NewEmptyContext()
137112

138-
mockController := &mockBinlogReplicaController{}
113+
mockConsumer := &mockBinlogConsumer{}
139114

140115
// Create two minimal events
141116
event1 := make([]byte, 19)
@@ -151,7 +126,7 @@ func TestBuildBinlog_MultilineBase64WithController(t *testing.T) {
151126
part2 := base64.StdEncoding.EncodeToString(combined[10:])
152127
multiline := part1 + "\n" + part2
153128

154-
binlogNode := plan.NewBinlog(multiline).WithBinlogReplicaController(mockController).(*plan.Binlog)
129+
binlogNode := plan.NewBinlog(multiline).WithBinlogConsumer(mockConsumer).(*plan.Binlog)
155130

156131
iter, err := builder.buildBinlog(ctx, binlogNode, nil)
157132
require.NoError(t, err)
@@ -162,7 +137,7 @@ func TestBuildBinlog_MultilineBase64WithController(t *testing.T) {
162137
require.NotNil(t, row)
163138
require.Equal(t, types.OkResult{}, row[0])
164139

165-
require.Len(t, mockController.consumedEvents, 2)
140+
require.Len(t, mockConsumer.consumedEvents, 2)
166141

167142
_, err = iter.Next(ctx)
168143
require.Equal(t, io.EOF, err)
@@ -172,7 +147,7 @@ func TestBuildBinlog_ControllerError(t *testing.T) {
172147
builder := &BaseBuilder{}
173148
ctx := sql.NewEmptyContext()
174149

175-
mockController := &mockBinlogReplicaController{
150+
mockConsumer := &mockBinlogConsumer{
176151
returnError: sql.ErrUnsupportedFeature.New("test error"),
177152
}
178153

@@ -181,7 +156,7 @@ func TestBuildBinlog_ControllerError(t *testing.T) {
181156
binary.LittleEndian.PutUint32(eventData[9:13], 19)
182157
encoded := base64.StdEncoding.EncodeToString(eventData)
183158

184-
binlogNode := plan.NewBinlog(encoded).WithBinlogReplicaController(mockController).(*plan.Binlog)
159+
binlogNode := plan.NewBinlog(encoded).WithBinlogConsumer(mockConsumer).(*plan.Binlog)
185160

186161
iter, err := builder.buildBinlog(ctx, binlogNode, nil)
187162
require.NoError(t, err)

0 commit comments

Comments
 (0)