Skip to content

Commit 4f87463

Browse files
authored
Merge pull request #3279 from dolthub/elian/9887
dolthub/dolt#9887: Add `BINLOG` and `mariadb-binlog` support
2 parents 38e56c0 + 16a53c3 commit 4f87463

File tree

19 files changed

+938
-51
lines changed

19 files changed

+938
-51
lines changed

enginetest/queries/queries.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5031,6 +5031,8 @@ SELECT * FROM cte WHERE d = 2;`,
50315031
{"gtid_next", "AUTOMATIC"},
50325032
{"gtid_owned", ""},
50335033
{"gtid_purged", ""},
5034+
{"gtid_domain_id", 0},
5035+
{"gtid_seq_no", 0},
50345036
},
50355037
},
50365038
{

sql/analyzer/catalog.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Catalog struct {
3434
DbProvider sql.DatabaseProvider
3535
AuthHandler sql.AuthorizationHandler
3636

37+
// BinlogConsumer holds an optional consumer that processes binlog events (e.g. for BINLOG statements).
38+
BinlogConsumer binlogreplication.BinlogConsumer
3739
// BinlogReplicaController holds an optional controller that receives forwarded binlog
3840
// replication messages (e.g. "start replica").
3941
BinlogReplicaController binlogreplication.BinlogReplicaController
@@ -53,8 +55,9 @@ func (c *Catalog) DropDbStats(ctx *sql.Context, db string, flush bool) error {
5355
}
5456

5557
var _ sql.Catalog = (*Catalog)(nil)
56-
var _ binlogreplication.BinlogReplicaCatalog = (*Catalog)(nil)
57-
var _ binlogreplication.BinlogPrimaryCatalog = (*Catalog)(nil)
58+
var _ binlogreplication.BinlogConsumerProvider = (*Catalog)(nil)
59+
var _ binlogreplication.BinlogReplicaProvider = (*Catalog)(nil)
60+
var _ binlogreplication.BinlogPrimaryProvider = (*Catalog)(nil)
5861

5962
type tableLocks map[string]struct{}
6063

@@ -76,6 +79,14 @@ func NewCatalog(provider sql.DatabaseProvider) *Catalog {
7679
return c
7780
}
7881

82+
func (c *Catalog) HasBinlogConsumer() bool {
83+
return c.BinlogConsumer != nil
84+
}
85+
86+
func (c *Catalog) GetBinlogConsumer() binlogreplication.BinlogConsumer {
87+
return c.BinlogConsumer
88+
}
89+
7990
func (c *Catalog) HasBinlogReplicaController() bool {
8091
return c.BinlogReplicaController != nil
8192
}

sql/binlogreplication/binlog_replication.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ import (
2525
"github.com/dolthub/vitess/go/mysql"
2626
)
2727

28+
// BinlogConsumer processes binlog events. This interface can be used by any component that needs to consume
29+
// and apply binlog events, such as BINLOG statement execution, streaming replication, or other binlog processing.
30+
type BinlogConsumer interface {
31+
// ProcessEvent processes a single binlog event.
32+
ProcessEvent(ctx *sql.Context, event mysql.BinlogEvent) error
33+
34+
// HasFormatDescription returns true if a FORMAT_DESCRIPTION_EVENT has been processed.
35+
// This is required before processing TABLE_MAP and row events in BINLOG statements.
36+
HasFormatDescription() bool
37+
}
38+
2839
// BinlogReplicaController allows callers to control a binlog replica. Providers built on go-mysql-server may optionally
2940
// implement this interface and use it when constructing a SQL engine in order to receive callbacks when replication
3041
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled.
@@ -147,21 +158,28 @@ type ReplicaStatus struct {
147158
SourceSsl bool
148159
}
149160

150-
// BinlogReplicaCatalog extends the Catalog interface and provides methods for accessing a BinlogReplicaController
151-
// for a Catalog.
152-
type BinlogReplicaCatalog interface {
153-
// HasBinlogReplicaController returns true if a non-nil BinlogReplicaController is available for this BinlogReplicaCatalog.
161+
// BinlogConsumerProvider provides methods for accessing a BinlogConsumer for BINLOG statement execution and other binlog
162+
// event processing. Typically implemented by sql.Catalog.
163+
type BinlogConsumerProvider interface {
164+
// HasBinlogConsumer returns true if a non-nil BinlogConsumer is available.
165+
HasBinlogConsumer() bool
166+
// GetBinlogConsumer returns the BinlogConsumer.
167+
GetBinlogConsumer() BinlogConsumer
168+
}
169+
170+
// BinlogReplicaProvider provides methods for accessing a BinlogReplicaController for binlog replica operations.
171+
type BinlogReplicaProvider interface {
172+
// HasBinlogReplicaController returns true if a non-nil BinlogReplicaController is available.
154173
HasBinlogReplicaController() bool
155-
// GetBinlogReplicaController returns the BinlogReplicaController registered with this BinlogReplicaCatalog.
174+
// GetBinlogReplicaController returns the BinlogReplicaController.
156175
GetBinlogReplicaController() BinlogReplicaController
157176
}
158177

159-
// BinlogPrimaryCatalog extends the Catalog interface and provides methods for accessing a BinlogPrimaryController
160-
// for a Catalog.
161-
type BinlogPrimaryCatalog interface {
162-
// HasBinlogPrimaryController returns true if a non-nil BinlogPrimaryController is available for this BinlogPrimaryCatalog.
178+
// BinlogPrimaryProvider provides methods for accessing a BinlogPrimaryController for binlog primary operations.
179+
type BinlogPrimaryProvider interface {
180+
// HasBinlogPrimaryController returns true if a non-nil BinlogPrimaryController is available.
163181
HasBinlogPrimaryController() bool
164-
// GetBinlogPrimaryController returns the BinlogPrimaryController registered with this BinlogPrimaryCatalog.
182+
// GetBinlogPrimaryController returns the BinlogPrimaryController.
165183
GetBinlogPrimaryController() BinlogPrimaryController
166184
}
167185

sql/collations.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package sql
1717
import (
1818
"fmt"
1919
"io"
20+
"strconv"
2021
"strings"
2122
"sync"
2223
"unicode/utf8"
@@ -972,3 +973,45 @@ type TypeWithCollation interface {
972973
// whether to include the character set and/or collation information.
973974
StringWithTableCollation(tableCollation CollationID) string
974975
}
976+
977+
// ConvertCollationID converts numeric collation IDs to their string names.
978+
func ConvertCollationID(val any) (string, error) {
979+
var collationID uint64
980+
switch v := val.(type) {
981+
case []byte:
982+
if n, err := strconv.ParseUint(string(v), 10, 64); err == nil {
983+
collationID = n
984+
} else {
985+
return string(v), nil
986+
}
987+
case int8:
988+
collationID = uint64(v)
989+
case int16:
990+
collationID = uint64(v)
991+
case int:
992+
collationID = uint64(v)
993+
case int32:
994+
collationID = uint64(v)
995+
case int64:
996+
collationID = uint64(v)
997+
case uint8:
998+
collationID = uint64(v)
999+
case uint16:
1000+
collationID = uint64(v)
1001+
case uint:
1002+
collationID = uint64(v)
1003+
case uint32:
1004+
collationID = uint64(v)
1005+
case uint64:
1006+
collationID = v
1007+
default:
1008+
return fmt.Sprintf("%v", val), nil
1009+
}
1010+
1011+
if collationID >= uint64(len(collationArray)) {
1012+
return fmt.Sprintf("%v", val), nil
1013+
}
1014+
1015+
collation := CollationID(collationID).Collation()
1016+
return collation.Name, nil
1017+
}

sql/collations_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,61 @@ func testParseCollation(t *testing.T, charset string, collation string, binaryAt
6969
}
7070
})
7171
}
72+
73+
func TestConvertCollationID(t *testing.T) {
74+
tests := []struct {
75+
input any
76+
expected string
77+
}{
78+
{uint64(33), "utf8mb3_general_ci"},
79+
{int64(33), "utf8mb3_general_ci"},
80+
{[]byte("33"), "utf8mb3_general_ci"},
81+
{uint64(8), "latin1_swedish_ci"},
82+
{int32(8), "latin1_swedish_ci"},
83+
84+
{45, "utf8mb4_general_ci"},
85+
{uint64(46), "utf8mb4_bin"},
86+
{255, "utf8mb4_0900_ai_ci"},
87+
{uint64(309), "utf8mb4_0900_bin"},
88+
89+
{83, "utf8mb3_bin"},
90+
{uint64(223), "utf8mb3_general_mysql500_ci"},
91+
92+
{uint64(47), "latin1_bin"},
93+
{48, "latin1_general_ci"},
94+
{49, "latin1_general_cs"},
95+
96+
{uint64(63), "binary"},
97+
98+
{uint64(11), "ascii_general_ci"},
99+
{65, "ascii_bin"},
100+
101+
{uint64(15), "latin1_danish_ci"},
102+
{31, "latin1_german2_ci"},
103+
{94, "latin1_spanish_ci"},
104+
105+
{int8(8), "latin1_swedish_ci"},
106+
{int16(8), "latin1_swedish_ci"},
107+
{int(8), "latin1_swedish_ci"},
108+
{uint8(8), "latin1_swedish_ci"},
109+
{uint16(8), "latin1_swedish_ci"},
110+
{uint(8), "latin1_swedish_ci"},
111+
{uint32(8), "latin1_swedish_ci"},
112+
113+
{"utf8mb4_0900_bin", "utf8mb4_0900_bin"},
114+
{"utf8mb3_general_ci", "utf8mb3_general_ci"},
115+
{"", ""},
116+
117+
{uint64(99999), "99999"},
118+
{uint64(1000), "1000"},
119+
{int(500), "500"},
120+
}
121+
122+
for _, tt := range tests {
123+
t.Run(fmt.Sprintf("%T(%v)", tt.input, tt.input), func(t *testing.T) {
124+
result, err := ConvertCollationID(tt.input)
125+
assert.NoError(t, err)
126+
assert.Equal(t, tt.expected, result)
127+
})
128+
}
129+
}

sql/errors.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,15 @@ var (
953953

954954
// ErrUnresolvedTableLock is returned when a FOR UPDATE OF clause references a table that doesn't exist in the query context.
955955
ErrUnresolvedTableLock = errors.NewKind("unresolved table name `%s` in locking clause.")
956+
957+
// ErrBase64DecodeError is returned when decoding a base64 string fails.
958+
ErrBase64DecodeError = errors.NewKind("Decoding of base64 string failed")
959+
960+
// ErrNoFormatDescriptionEventBeforeBinlogStatement is returned when a BINLOG statement is not preceded by a format description event.
961+
ErrNoFormatDescriptionEventBeforeBinlogStatement = errors.NewKind("The BINLOG statement of type `%s` was not preceded by a format description BINLOG statement.")
962+
963+
// ErrOnlyFDAndRBREventsAllowedInBinlogStatement is returned when an unsupported event type is used in a BINLOG statement.
964+
ErrOnlyFDAndRBREventsAllowedInBinlogStatement = errors.NewKind("Only Format_description_log_event and row events are allowed in BINLOG statements (but %s was provided)")
956965
)
957966

958967
// CastSQLError returns a *mysql.SQLError with the error code and in some cases, also a SQL state, populated for the
@@ -1034,6 +1043,12 @@ func CastSQLError(err error) *mysql.SQLError {
10341043
// https://en.wikipedia.org/wiki/SQLSTATE
10351044
code = mysql.ERLockDeadlock
10361045
sqlState = mysql.SSLockDeadlock
1046+
case ErrBase64DecodeError.Is(err):
1047+
code = mysql.ERBase64DecodeError
1048+
case ErrNoFormatDescriptionEventBeforeBinlogStatement.Is(err):
1049+
code = mysql.ERNoFormatDescriptionEventBeforeBinlogStatement
1050+
case ErrOnlyFDAndRBREventsAllowedInBinlogStatement.Is(err):
1051+
code = mysql.EROnlyFDAndRBREventsAllowedInBinlogStatement
10371052
default:
10381053
code = mysql.ERUnknownError
10391054
}

sql/plan/binlog.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2025 Dolthub, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package plan
16+
17+
import (
18+
"github.com/dolthub/go-mysql-server/sql"
19+
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
20+
"github.com/dolthub/go-mysql-server/sql/types"
21+
)
22+
23+
// DynamicPrivilege_BinlogAdmin enables binary log control by means of the PURGE BINARY LOGS and BINLOG statements.
24+
// https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_binlog-admin
25+
const DynamicPrivilege_BinlogAdmin = "binlog_admin"
26+
27+
// Binlog replays binary log events, which record database changes in a binary format for efficiency. Tools like
28+
// mysqldump, mysqlbinlog, and mariadb-binlog read these binary events from log files and output them as base64-encoded
29+
// BINLOG statements for replay.
30+
//
31+
// The BINLOG statement execution is delegated to the BinlogConsumer. The base64-encoded event data is decoded
32+
// and passed to the consumer's ProcessEvent method for processing. This allows integrators like Dolt to handle
33+
// BINLOG statement execution using their binlog event processing infrastructure.
34+
//
35+
// See https://dev.mysql.com/doc/refman/8.4/en/binlog.html for the BINLOG statement specification.
36+
type Binlog struct {
37+
Base64Str string
38+
Consumer binlogreplication.BinlogConsumer
39+
}
40+
41+
var _ sql.Node = (*Binlog)(nil)
42+
var _ BinlogConsumerCommand = (*Binlog)(nil)
43+
44+
// NewBinlog creates a new Binlog node.
45+
func NewBinlog(base64Str string) *Binlog {
46+
return &Binlog{
47+
Base64Str: base64Str,
48+
}
49+
}
50+
51+
// WithBinlogConsumer implements the BinlogConsumerCommand interface.
52+
func (b *Binlog) WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node {
53+
nc := *b
54+
nc.Consumer = consumer
55+
return &nc
56+
}
57+
58+
func (b *Binlog) String() string {
59+
return "BINLOG"
60+
}
61+
62+
func (b *Binlog) Resolved() bool {
63+
return true
64+
}
65+
66+
func (b *Binlog) Schema() sql.Schema {
67+
return types.OkResultSchema
68+
}
69+
70+
func (b *Binlog) Children() []sql.Node {
71+
return nil
72+
}
73+
74+
func (b *Binlog) IsReadOnly() bool {
75+
return false
76+
}
77+
78+
// WithChildren implements the Node interface.
79+
func (b *Binlog) WithChildren(children ...sql.Node) (sql.Node, error) {
80+
if len(children) != 0 {
81+
return nil, sql.ErrInvalidChildrenNumber.New(b, len(children), 0)
82+
}
83+
return b, nil
84+
}

sql/plan/replication_commands.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@ var ErrNoReplicationController = errors.NewKind("no replication controller avail
3232
// https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_replication-slave-admin
3333
const DynamicPrivilege_ReplicationSlaveAdmin = "replication_slave_admin"
3434

35+
// DynamicPrivilege_ReplicationApplier is a dynamic privilege that permits executing BINLOG statements.
36+
// See https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_replication-applier
37+
const DynamicPrivilege_ReplicationApplier = "replication_applier"
38+
39+
// BinlogConsumerCommand represents a SQL statement that requires a BinlogConsumer
40+
// (e.g. BINLOG statement).
41+
type BinlogConsumerCommand interface {
42+
sql.Node
43+
44+
// WithBinlogConsumer returns a new instance of this command, with the binlog consumer configured.
45+
WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node
46+
}
47+
3548
// BinlogReplicaControllerCommand represents a SQL statement that requires a BinlogReplicaController
3649
// (e.g. Start Replica, Show Replica Status).
3750
type BinlogReplicaControllerCommand interface {
@@ -54,6 +67,11 @@ type BinlogPrimaryControllerCommand interface {
5467

5568
// ChangeReplicationSource is the plan node for the "CHANGE REPLICATION SOURCE TO" statement.
5669
// https://dev.mysql.com/doc/refman/8.0/en/change-replication-source-to.html
70+
//
71+
// TODO: When PRIVILEGE_CHECKS_USER option is specified, validate that the assigned user account has the
72+
// REPLICATION_APPLIER privilege. This validation should happen before the option is passed to the integrator's
73+
// BinlogReplicaController.SetReplicationSourceOptions().
74+
// See https://github.com/mysql/mysql-server/blob/8.0/sql/rpl_replica.cc change_master_cmd
5775
type ChangeReplicationSource struct {
5876
ReplicaController binlogreplication.BinlogReplicaController
5977
Options []binlogreplication.ReplicationOption

sql/planbuilder/auth_default.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/dolthub/go-mysql-server/sql"
2626
"github.com/dolthub/go-mysql-server/sql/mysql_db"
27+
"github.com/dolthub/go-mysql-server/sql/plan"
2728
)
2829

2930
// defaultAuthorizationQueryState contains query-specific state for defaultAuthorizationHandler.
@@ -131,6 +132,10 @@ func (h defaultAuthorizationHandler) HandleAuth(ctx *sql.Context, aqs sql.Author
131132
hasPrivileges = state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{Database: "mysql"}, sql.PrivilegeType_Update)) ||
132133
state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{}, sql.PrivilegeType_CreateUser)) ||
133134
state.user.User == auth.TargetNames[0]
135+
case ast.AuthType_BINLOG:
136+
hasPrivileges = state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{}, sql.PrivilegeType_Super)) ||
137+
state.db.UserHasPrivileges(ctx, sql.NewDynamicPrivilegedOperation(plan.DynamicPrivilege_BinlogAdmin)) ||
138+
state.db.UserHasPrivileges(ctx, sql.NewDynamicPrivilegedOperation(plan.DynamicPrivilege_ReplicationApplier))
134139
case ast.AuthType_CALL:
135140
hasPrivileges, err = h.call(ctx, state, auth)
136141
if err != nil {

0 commit comments

Comments
 (0)