Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions enginetest/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -5031,6 +5031,8 @@ SELECT * FROM cte WHERE d = 2;`,
{"gtid_next", "AUTOMATIC"},
{"gtid_owned", ""},
{"gtid_purged", ""},
{"gtid_domain_id", 0},
{"gtid_seq_no", 0},
},
},
{
Expand Down
15 changes: 13 additions & 2 deletions sql/analyzer/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Catalog struct {
DbProvider sql.DatabaseProvider
AuthHandler sql.AuthorizationHandler

// BinlogConsumer holds an optional consumer that processes binlog events (e.g. for BINLOG statements).
BinlogConsumer binlogreplication.BinlogConsumer
// BinlogReplicaController holds an optional controller that receives forwarded binlog
// replication messages (e.g. "start replica").
BinlogReplicaController binlogreplication.BinlogReplicaController
Expand All @@ -53,8 +55,9 @@ func (c *Catalog) DropDbStats(ctx *sql.Context, db string, flush bool) error {
}

var _ sql.Catalog = (*Catalog)(nil)
var _ binlogreplication.BinlogReplicaCatalog = (*Catalog)(nil)
var _ binlogreplication.BinlogPrimaryCatalog = (*Catalog)(nil)
var _ binlogreplication.BinlogConsumerProvider = (*Catalog)(nil)
var _ binlogreplication.BinlogReplicaProvider = (*Catalog)(nil)
var _ binlogreplication.BinlogPrimaryProvider = (*Catalog)(nil)

type tableLocks map[string]struct{}

Expand All @@ -76,6 +79,14 @@ func NewCatalog(provider sql.DatabaseProvider) *Catalog {
return c
}

func (c *Catalog) HasBinlogConsumer() bool {
return c.BinlogConsumer != nil
}

func (c *Catalog) GetBinlogConsumer() binlogreplication.BinlogConsumer {
return c.BinlogConsumer
}

func (c *Catalog) HasBinlogReplicaController() bool {
return c.BinlogReplicaController != nil
}
Expand Down
38 changes: 28 additions & 10 deletions sql/binlogreplication/binlog_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ import (
"github.com/dolthub/vitess/go/mysql"
)

// BinlogConsumer processes binlog events. This interface can be used by any component that needs to consume
// and apply binlog events, such as BINLOG statement execution, streaming replication, or other binlog processing.
type BinlogConsumer interface {
// ProcessEvent processes a single binlog event.
ProcessEvent(ctx *sql.Context, event mysql.BinlogEvent) error

// HasFormatDescription returns true if a FORMAT_DESCRIPTION_EVENT has been processed.
// This is required before processing TABLE_MAP and row events in BINLOG statements.
HasFormatDescription() bool
}

// BinlogReplicaController allows callers to control a binlog replica. Providers built on go-mysql-server may optionally
// implement this interface and use it when constructing a SQL engine in order to receive callbacks when replication
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled.
Expand Down Expand Up @@ -147,21 +158,28 @@ type ReplicaStatus struct {
SourceSsl bool
}

// BinlogReplicaCatalog extends the Catalog interface and provides methods for accessing a BinlogReplicaController
// for a Catalog.
type BinlogReplicaCatalog interface {
// HasBinlogReplicaController returns true if a non-nil BinlogReplicaController is available for this BinlogReplicaCatalog.
// BinlogConsumerProvider provides methods for accessing a BinlogConsumer for BINLOG statement execution and other binlog
// event processing. Typically implemented by sql.Catalog.
type BinlogConsumerProvider interface {
// HasBinlogConsumer returns true if a non-nil BinlogConsumer is available.
HasBinlogConsumer() bool
// GetBinlogConsumer returns the BinlogConsumer.
GetBinlogConsumer() BinlogConsumer
}

// BinlogReplicaProvider provides methods for accessing a BinlogReplicaController for binlog replica operations.
type BinlogReplicaProvider interface {
// HasBinlogReplicaController returns true if a non-nil BinlogReplicaController is available.
HasBinlogReplicaController() bool
// GetBinlogReplicaController returns the BinlogReplicaController registered with this BinlogReplicaCatalog.
// GetBinlogReplicaController returns the BinlogReplicaController.
GetBinlogReplicaController() BinlogReplicaController
}

// BinlogPrimaryCatalog extends the Catalog interface and provides methods for accessing a BinlogPrimaryController
// for a Catalog.
type BinlogPrimaryCatalog interface {
// HasBinlogPrimaryController returns true if a non-nil BinlogPrimaryController is available for this BinlogPrimaryCatalog.
// BinlogPrimaryProvider provides methods for accessing a BinlogPrimaryController for binlog primary operations.
type BinlogPrimaryProvider interface {
// HasBinlogPrimaryController returns true if a non-nil BinlogPrimaryController is available.
HasBinlogPrimaryController() bool
// GetBinlogPrimaryController returns the BinlogPrimaryController registered with this BinlogPrimaryCatalog.
// GetBinlogPrimaryController returns the BinlogPrimaryController.
GetBinlogPrimaryController() BinlogPrimaryController
}

Expand Down
43 changes: 43 additions & 0 deletions sql/collations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sql
import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"unicode/utf8"
Expand Down Expand Up @@ -972,3 +973,45 @@ type TypeWithCollation interface {
// whether to include the character set and/or collation information.
StringWithTableCollation(tableCollation CollationID) string
}

// ConvertCollationID converts numeric collation IDs to their string names.
func ConvertCollationID(val any) (string, error) {
var collationID uint64
switch v := val.(type) {
case []byte:
if n, err := strconv.ParseUint(string(v), 10, 64); err == nil {
collationID = n
} else {
return string(v), nil
}
case int8:
collationID = uint64(v)
case int16:
collationID = uint64(v)
case int:
collationID = uint64(v)
case int32:
collationID = uint64(v)
case int64:
collationID = uint64(v)
case uint8:
collationID = uint64(v)
case uint16:
collationID = uint64(v)
case uint:
collationID = uint64(v)
case uint32:
collationID = uint64(v)
case uint64:
collationID = v
default:
return fmt.Sprintf("%v", val), nil
}

if collationID >= uint64(len(collationArray)) {
return fmt.Sprintf("%v", val), nil
}

collation := CollationID(collationID).Collation()
return collation.Name, nil
}
58 changes: 58 additions & 0 deletions sql/collations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,61 @@ func testParseCollation(t *testing.T, charset string, collation string, binaryAt
}
})
}

func TestConvertCollationID(t *testing.T) {
tests := []struct {
input any
expected string
}{
{uint64(33), "utf8mb3_general_ci"},
{int64(33), "utf8mb3_general_ci"},
{[]byte("33"), "utf8mb3_general_ci"},
{uint64(8), "latin1_swedish_ci"},
{int32(8), "latin1_swedish_ci"},

{45, "utf8mb4_general_ci"},
{uint64(46), "utf8mb4_bin"},
{255, "utf8mb4_0900_ai_ci"},
{uint64(309), "utf8mb4_0900_bin"},

{83, "utf8mb3_bin"},
{uint64(223), "utf8mb3_general_mysql500_ci"},

{uint64(47), "latin1_bin"},
{48, "latin1_general_ci"},
{49, "latin1_general_cs"},

{uint64(63), "binary"},

{uint64(11), "ascii_general_ci"},
{65, "ascii_bin"},

{uint64(15), "latin1_danish_ci"},
{31, "latin1_german2_ci"},
{94, "latin1_spanish_ci"},

{int8(8), "latin1_swedish_ci"},
{int16(8), "latin1_swedish_ci"},
{int(8), "latin1_swedish_ci"},
{uint8(8), "latin1_swedish_ci"},
{uint16(8), "latin1_swedish_ci"},
{uint(8), "latin1_swedish_ci"},
{uint32(8), "latin1_swedish_ci"},

{"utf8mb4_0900_bin", "utf8mb4_0900_bin"},
{"utf8mb3_general_ci", "utf8mb3_general_ci"},
{"", ""},

{uint64(99999), "99999"},
{uint64(1000), "1000"},
{int(500), "500"},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("%T(%v)", tt.input, tt.input), func(t *testing.T) {
result, err := ConvertCollationID(tt.input)
assert.NoError(t, err)
assert.Equal(t, tt.expected, result)
})
}
}
15 changes: 15 additions & 0 deletions sql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,15 @@ var (

// ErrUnresolvedTableLock is returned when a FOR UPDATE OF clause references a table that doesn't exist in the query context.
ErrUnresolvedTableLock = errors.NewKind("unresolved table name `%s` in locking clause.")

// ErrBase64DecodeError is returned when decoding a base64 string fails.
ErrBase64DecodeError = errors.NewKind("Decoding of base64 string failed")

// ErrNoFormatDescriptionEventBeforeBinlogStatement is returned when a BINLOG statement is not preceded by a format description event.
ErrNoFormatDescriptionEventBeforeBinlogStatement = errors.NewKind("The BINLOG statement of type `%s` was not preceded by a format description BINLOG statement.")

// ErrOnlyFDAndRBREventsAllowedInBinlogStatement is returned when an unsupported event type is used in a BINLOG statement.
ErrOnlyFDAndRBREventsAllowedInBinlogStatement = errors.NewKind("Only Format_description_log_event and row events are allowed in BINLOG statements (but %s was provided)")
)

// CastSQLError returns a *mysql.SQLError with the error code and in some cases, also a SQL state, populated for the
Expand Down Expand Up @@ -1034,6 +1043,12 @@ func CastSQLError(err error) *mysql.SQLError {
// https://en.wikipedia.org/wiki/SQLSTATE
code = mysql.ERLockDeadlock
sqlState = mysql.SSLockDeadlock
case ErrBase64DecodeError.Is(err):
code = mysql.ERBase64DecodeError
case ErrNoFormatDescriptionEventBeforeBinlogStatement.Is(err):
code = mysql.ERNoFormatDescriptionEventBeforeBinlogStatement
case ErrOnlyFDAndRBREventsAllowedInBinlogStatement.Is(err):
code = mysql.EROnlyFDAndRBREventsAllowedInBinlogStatement
default:
code = mysql.ERUnknownError
}
Expand Down
84 changes: 84 additions & 0 deletions sql/plan/binlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
"github.com/dolthub/go-mysql-server/sql/types"
)

// DynamicPrivilege_BinlogAdmin enables binary log control by means of the PURGE BINARY LOGS and BINLOG statements.
// https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_binlog-admin
const DynamicPrivilege_BinlogAdmin = "binlog_admin"

// Binlog replays binary log events, which record database changes in a binary format for efficiency. Tools like
// mysqldump, mysqlbinlog, and mariadb-binlog read these binary events from log files and output them as base64-encoded
// BINLOG statements for replay.
//
// The BINLOG statement execution is delegated to the BinlogConsumer. The base64-encoded event data is decoded
// and passed to the consumer's ProcessEvent method for processing. This allows integrators like Dolt to handle
// BINLOG statement execution using their binlog event processing infrastructure.
//
// See https://dev.mysql.com/doc/refman/8.4/en/binlog.html for the BINLOG statement specification.
type Binlog struct {
Base64Str string
Consumer binlogreplication.BinlogConsumer
}

var _ sql.Node = (*Binlog)(nil)
var _ BinlogConsumerCommand = (*Binlog)(nil)

// NewBinlog creates a new Binlog node.
func NewBinlog(base64Str string) *Binlog {
return &Binlog{
Base64Str: base64Str,
}
}

// WithBinlogConsumer implements the BinlogConsumerCommand interface.
func (b *Binlog) WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node {
nc := *b
nc.Consumer = consumer
return &nc
}

func (b *Binlog) String() string {
return "BINLOG"
}

func (b *Binlog) Resolved() bool {
return true
}

func (b *Binlog) Schema() sql.Schema {
return types.OkResultSchema
}

func (b *Binlog) Children() []sql.Node {
return nil
}

func (b *Binlog) IsReadOnly() bool {
return false
}

// WithChildren implements the Node interface.
func (b *Binlog) WithChildren(children ...sql.Node) (sql.Node, error) {
if len(children) != 0 {
return nil, sql.ErrInvalidChildrenNumber.New(b, len(children), 0)
}
return b, nil
}
18 changes: 18 additions & 0 deletions sql/plan/replication_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ var ErrNoReplicationController = errors.NewKind("no replication controller avail
// https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_replication-slave-admin
const DynamicPrivilege_ReplicationSlaveAdmin = "replication_slave_admin"

// DynamicPrivilege_ReplicationApplier is a dynamic privilege that permits executing BINLOG statements.
// See https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_replication-applier
const DynamicPrivilege_ReplicationApplier = "replication_applier"

// BinlogConsumerCommand represents a SQL statement that requires a BinlogConsumer
// (e.g. BINLOG statement).
type BinlogConsumerCommand interface {
sql.Node

// WithBinlogConsumer returns a new instance of this command, with the binlog consumer configured.
WithBinlogConsumer(consumer binlogreplication.BinlogConsumer) sql.Node
}

// BinlogReplicaControllerCommand represents a SQL statement that requires a BinlogReplicaController
// (e.g. Start Replica, Show Replica Status).
type BinlogReplicaControllerCommand interface {
Expand All @@ -54,6 +67,11 @@ type BinlogPrimaryControllerCommand interface {

// ChangeReplicationSource is the plan node for the "CHANGE REPLICATION SOURCE TO" statement.
// https://dev.mysql.com/doc/refman/8.0/en/change-replication-source-to.html
//
// TODO: When PRIVILEGE_CHECKS_USER option is specified, validate that the assigned user account has the
// REPLICATION_APPLIER privilege. This validation should happen before the option is passed to the integrator's
// BinlogReplicaController.SetReplicationSourceOptions().
// See https://github.com/mysql/mysql-server/blob/8.0/sql/rpl_replica.cc change_master_cmd
type ChangeReplicationSource struct {
ReplicaController binlogreplication.BinlogReplicaController
Options []binlogreplication.ReplicationOption
Expand Down
5 changes: 5 additions & 0 deletions sql/planbuilder/auth_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/go-mysql-server/sql/plan"
)

// defaultAuthorizationQueryState contains query-specific state for defaultAuthorizationHandler.
Expand Down Expand Up @@ -131,6 +132,10 @@ func (h defaultAuthorizationHandler) HandleAuth(ctx *sql.Context, aqs sql.Author
hasPrivileges = state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{Database: "mysql"}, sql.PrivilegeType_Update)) ||
state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{}, sql.PrivilegeType_CreateUser)) ||
state.user.User == auth.TargetNames[0]
case ast.AuthType_BINLOG:
hasPrivileges = state.db.UserHasPrivileges(ctx, sql.NewPrivilegedOperation(sql.PrivilegeCheckSubject{}, sql.PrivilegeType_Super)) ||
state.db.UserHasPrivileges(ctx, sql.NewDynamicPrivilegedOperation(plan.DynamicPrivilege_BinlogAdmin)) ||
state.db.UserHasPrivileges(ctx, sql.NewDynamicPrivilegedOperation(plan.DynamicPrivilege_ReplicationApplier))
case ast.AuthType_CALL:
hasPrivileges, err = h.call(ctx, state, auth)
if err != nil {
Expand Down
Loading
Loading