Skip to content

Commit 0777e6c

Browse files
committed
amend to use replica controller
1 parent f9a39ec commit 0777e6c

File tree

7 files changed

+182
-728
lines changed

7 files changed

+182
-728
lines changed

enginetest/memory_engine_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -607,10 +607,6 @@ func TestLoadDataFailing(t *testing.T) {
607607
enginetest.TestLoadDataFailing(t, enginetest.NewDefaultMemoryHarness())
608608
}
609609

610-
func TestBinlog(t *testing.T) {
611-
enginetest.TestBinlog(t, enginetest.NewDefaultMemoryHarness())
612-
}
613-
614610
func TestSelectIntoFile(t *testing.T) {
615611
enginetest.TestSelectIntoFile(t, enginetest.NewDefaultMemoryHarness())
616612
}

sql/binlogreplication/binlog_replication.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727

2828
// BinlogReplicaController allows callers to control a binlog replica. Providers built on go-mysql-server may optionally
2929
// implement this interface and use it when constructing a SQL engine in order to receive callbacks when replication
30-
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled.
30+
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled. This interface also extends BinlogConsumer
31+
// to allow the same infrastructure to handle BINLOG statement execution.
3132
type BinlogReplicaController interface {
3233
// StartReplica tells the binlog replica controller to start up replication processes for the current replication
3334
// configuration. An error is returned if replication was unable to be started. Note the error response only signals
@@ -62,6 +63,12 @@ type BinlogReplicaController interface {
6263
// error indicating that replication needs to be stopped before it can be reset. If any errors were encountered
6364
// resetting the replica state, an error is returned, otherwise nil is returned if the reset was successful.
6465
ResetReplica(ctx *sql.Context, resetAll bool) error
66+
67+
// ConsumeBinlogEvent processes a single parsed binlog event.
68+
ConsumeBinlogEvent(ctx *sql.Context, event mysql.BinlogEvent) error
69+
70+
// HasFormatDescription returns true if a FORMAT_DESCRIPTION_EVENT has been processed.
71+
HasFormatDescription() bool
6572
}
6673

6774
// BinlogPrimaryController allows an integrator to extend GMS with support for operating as a binlog primary server.
@@ -148,7 +155,7 @@ type ReplicaStatus struct {
148155
}
149156

150157
// BinlogReplicaCatalog extends the Catalog interface and provides methods for accessing a BinlogReplicaController
151-
// for a Catalog.
158+
// for a Catalog. The controller can also be used as a BinlogConsumer for BINLOG statement execution.
152159
type BinlogReplicaCatalog interface {
153160
// HasBinlogReplicaController returns true if a non-nil BinlogReplicaController is available for this BinlogReplicaCatalog.
154161
HasBinlogReplicaController() bool

sql/plan/binlog.go

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package plan
1616

1717
import (
1818
"github.com/dolthub/go-mysql-server/sql"
19+
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
1920
"github.com/dolthub/go-mysql-server/sql/types"
2021
)
2122

@@ -27,47 +28,33 @@ const DynamicPrivilege_BinlogAdmin = "binlog_admin"
2728
// mysqldump, mysqlbinlog, and mariadb-binlog read these binary events from log files and output them as base64-encoded
2829
// BINLOG statements for replay.
2930
//
30-
// This implementation supports row-based replication (RBR) events, which is the modern standard for MySQL/MariaDB
31-
// replication (default since MySQL 5.7+ and MariaDB 10.2+).
32-
//
33-
// The base64 string is split by newlines and decoded into a buffer of raw binlog events. Each event begins with a
34-
// 19-byte header containing the event type at byte 4 and event length at bytes 9-12. The buffer is processed
35-
// sequentially, dispatching each event to a handler based on its type.
36-
//
37-
// FORMAT_DESCRIPTION_EVENT stores binlog format metadata in session state. This metadata includes header sizes for each
38-
// event type and the checksum algorithm (OFF, CRC32, or UNDEF). Subsequent events require this metadata to parse their
39-
// headers and determine whether checksums are present.
40-
//
41-
// TABLE_MAP_EVENT creates a mapping from a table ID to the database name, table name, and column metadata. Row events
42-
// reference tables by ID rather than name for encoding efficiency. The mapping is stored in a global cache for use by
43-
// subsequent row events.
44-
//
45-
// WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, and DELETE_ROWS_EVENT contain binary-encoded row data. Before parsing row data,
46-
// any CRC32 checksum appended to the event must be stripped. Checksums verify data integrity during network
47-
// transmission and disk storage but are not part of the event payload structure.
48-
//
49-
// Transaction boundary and metadata events are silently ignored since each BINLOG statement is auto-committed and full
50-
// replication semantics are not required for mysqldump replay.
51-
//
52-
// QUERY_EVENT (statement-based replication) is not currently supported. Statement-based replication is deprecated in
53-
// favor of row-based replication to correctly support non-deterministic functions.
31+
// The BINLOG statement execution is delegated to the BinlogReplicaController. The base64-encoded event data is decoded
32+
// and passed to the controller's ConsumeBinlogEvents method for processing. This allows integrators like Dolt to handle
33+
// BINLOG statement execution using their existing binlog replication infrastructure.
5434
//
5535
// See https://dev.mysql.com/doc/refman/8.4/en/binlog.html for the BINLOG statement specification.
5636
type Binlog struct {
57-
Base64Str string
58-
Catalog sql.Catalog
37+
Base64Str string
38+
ReplicaController binlogreplication.BinlogReplicaController
5939
}
6040

6141
var _ sql.Node = (*Binlog)(nil)
42+
var _ BinlogReplicaControllerCommand = (*Binlog)(nil)
6243

6344
// NewBinlog creates a new Binlog node.
64-
func NewBinlog(base64Str string, catalog sql.Catalog) *Binlog {
45+
func NewBinlog(base64Str string) *Binlog {
6546
return &Binlog{
6647
Base64Str: base64Str,
67-
Catalog: catalog,
6848
}
6949
}
7050

51+
// WithBinlogReplicaController implements the BinlogReplicaControllerCommand interface.
52+
func (b *Binlog) WithBinlogReplicaController(controller binlogreplication.BinlogReplicaController) sql.Node {
53+
nc := *b
54+
nc.ReplicaController = controller
55+
return &nc
56+
}
57+
7158
func (b *Binlog) String() string {
7259
return "BINLOG"
7360
}

sql/plan/replication_commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type BinlogPrimaryControllerCommand interface {
6262
// TODO: When PRIVILEGE_CHECKS_USER option is specified, validate that the assigned user account has the
6363
// REPLICATION_APPLIER privilege. This validation should happen before the option is passed to the integrator's
6464
// BinlogReplicaController.SetReplicationSourceOptions().
65-
// See https://github.com/mysql/mysql-server/blob/8.0/sql/rpl_replica.cc (change_master_cmd function)
65+
// See https://github.com/mysql/mysql-server/blob/8.0/sql/rpl_replica.cc change_master_cmd
6666
type ChangeReplicationSource struct {
6767
ReplicaController binlogreplication.BinlogReplicaController
6868
Options []binlogreplication.ReplicationOption

sql/planbuilder/builder.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,11 @@ func (b *Builder) buildSubquery(inScope *scope, stmt ast.Statement, subQuery str
406406
b.handleErr(err)
407407
}
408408
outScope = inScope.push()
409-
outScope.node = plan.NewBinlog(n.Base64Str, b.cat)
409+
binlogNode := plan.NewBinlog(n.Base64Str)
410+
if binCat, ok := b.cat.(binlogreplication.BinlogReplicaCatalog); ok && binCat.HasBinlogReplicaController() {
411+
binlogNode = binlogNode.WithBinlogReplicaController(binCat.GetBinlogReplicaController()).(*plan.Binlog)
412+
}
413+
outScope.node = binlogNode
410414
}
411415
return
412416
}

0 commit comments

Comments
 (0)