Skip to content

Commit b14ed9b

Browse files
authored
Merge pull request #453 from dolthub/fulghum/binlog_row_metadata_full
Add support for serializing optional table map metadata
2 parents bd171d3 + aa4bb07 commit b14ed9b

File tree

3 files changed

+241
-9
lines changed

3 files changed

+241
-9
lines changed

go/mysql/binlog_event.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,36 @@ type TableMap struct {
205205
// - If the metadata is one byte, only the lower 8 bits are used.
206206
// - If the metadata is two bytes, all 16 bits are used.
207207
Metadata []uint16
208+
209+
// OptionalColumnNames contains optional metadata on column names for
210+
// the table this TableMap describes. This metadata should only be set
211+
// when @@binlog_row_metadata is set to FULL.
212+
OptionalColumnNames []string
213+
214+
// OptionalColumnCollations contains optional metadata on column collations
215+
// for the table this TableMap describes. This metadata should only be set
216+
// when @@binlog_row_metadata is set to FULL.
217+
OptionalColumnCollations []uint64
218+
219+
// OptionalEnumValues contains one []string for each enum field in a table's
220+
// schema, where that []string contains the string values of the enum. This
221+
// metadata should only be set when @@binlog_row_metadata is set to FULL.
222+
OptionalEnumValues [][]string
223+
224+
// OptionalSetValues contains one []string for each set field in a table's
225+
// schema, where that []string contains the string values of the set. This
226+
// metadata should only be set when @@binlog_row_metadata is set to FULL.
227+
OptionalSetValues [][]string
228+
229+
// OptionalEnumAndSetCollations contains one entry for each set and enum
230+
// field in the table's schema, indicating the set or enum's collation.
231+
OptionalEnumAndSetCollations []uint64
208232
}
209233

210234
// String implements the Stringer interface
211235
func (t *TableMap) String() string {
212-
return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v}",
213-
t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata)
236+
return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v, OptionalColumnNames: %v, OptionalColumnCollations: %v, OptionalEnumValues: %v, OptionalSetValues: %v, OptionalEnumAndSetCollations: %v}",
237+
t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata, t.OptionalColumnNames, t.OptionalColumnCollations, t.OptionalEnumValues, t.OptionalSetValues, t.OptionalEnumAndSetCollations)
214238
}
215239

216240
// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.

go/mysql/binlog_event_make.go

Lines changed: 147 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package mysql
1818

1919
import (
2020
"encoding/binary"
21+
"fmt"
2122
"hash/crc32"
2223
)
2324

@@ -364,9 +365,143 @@ func NewMySQLGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid Mysql56GTID,
364365
return NewMysql56BinlogEvent(ev)
365366
}
366367

367-
// NewTableMapEvent returns a TableMap event.
368+
// TableMap optional metadata field types. These identifiers are used in the wire protocol to indicate
369+
// different sections of data in the optional table map metadata.
370+
const (
371+
TableMapOptMetaColumnCharset = 0x03
372+
TableMapOptMetaColumnName = 0x04
373+
TableMapOptMetaSetValues = 0x05
374+
TableMapOptMetaEnumValues = 0x06
375+
TableMapOptMetaEnumSetCharset = 0x0b
376+
)
377+
378+
// buildOptionalTableMapMetadata takes the optional metadata (e.g. column names, column collation IDs, enum and
379+
// set names, and enum and set collation IDs) from |tableMap| and encodes it into the binary format needed to
380+
// transmit this metadata to a remote MySQL replica.
381+
func buildOptionalTableMapMetadata(tableMap *TableMap) ([]byte, error) {
382+
columnNames := tableMap.OptionalColumnNames
383+
columnCollationIds := tableMap.OptionalColumnCollations
384+
385+
if len(columnNames) == 0 || len(columnCollationIds) == 0 {
386+
return nil, nil
387+
}
388+
389+
if len(columnNames) != len(columnCollationIds) {
390+
return nil, fmt.Errorf("len mismatch: %d columnNames vs %d collationIDs", len(columnNames), len(columnCollationIds))
391+
}
392+
393+
// 1) Build COLUMN_CHARSET payload
394+
var charsetPayload []byte
395+
for _, coll := range columnCollationIds {
396+
charsetPayload = append(charsetPayload, encodeLenEncInt(coll)...)
397+
}
398+
399+
// 2) Build COLUMN_NAME payload
400+
var namePayload []byte
401+
for _, n := range columnNames {
402+
if len(n) > 255 {
403+
return nil, fmt.Errorf("column name too long (>255): %q", n)
404+
}
405+
namePayload = append(namePayload, byte(len(n)))
406+
namePayload = append(namePayload, []byte(n)...)
407+
}
408+
409+
// 3) Build ENUM VALUES payload
410+
var enumValuesPayload []byte
411+
for _, enumValues := range tableMap.OptionalEnumValues {
412+
enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValues)))...)
413+
for _, enumValue := range enumValues {
414+
enumValuesPayload = append(enumValuesPayload, encodeLenEncInt(uint64(len(enumValue)))...)
415+
enumValuesPayload = append(enumValuesPayload, []byte(enumValue)...)
416+
}
417+
}
418+
419+
// 4) Build SET VALUES payload
420+
var setValuesPayload []byte
421+
for _, setValues := range tableMap.OptionalSetValues {
422+
setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValues)))...)
423+
for _, setValue := range setValues {
424+
setValuesPayload = append(setValuesPayload, encodeLenEncInt(uint64(len(setValue)))...)
425+
setValuesPayload = append(setValuesPayload, []byte(setValue)...)
426+
}
427+
}
428+
429+
// 5) Build ENUM/SET CHARSET payload
430+
var enumSetCharsetPayload []byte
431+
for _, coll := range tableMap.OptionalEnumAndSetCollations {
432+
enumSetCharsetPayload = append(enumSetCharsetPayload, encodeLenEncInt(coll)...)
433+
}
434+
435+
// 6) Wrap each payload as an optional metadata block: [type][lenenc-int][payload]
436+
var out []byte
437+
out = append(out, buildOptMetaBlock(TableMapOptMetaColumnCharset, charsetPayload)...)
438+
out = append(out, buildOptMetaBlock(TableMapOptMetaColumnName, namePayload)...)
439+
out = append(out, buildOptMetaBlock(TableMapOptMetaEnumValues, enumValuesPayload)...)
440+
out = append(out, buildOptMetaBlock(TableMapOptMetaSetValues, setValuesPayload)...)
441+
out = append(out, buildOptMetaBlock(TableMapOptMetaEnumSetCharset, enumSetCharsetPayload)...)
442+
443+
return out, nil
444+
}
445+
446+
// buildOptMetaBlock constructs a single optional metadata block in the MySQL
447+
// binlog row event format.
448+
//
449+
// The block is encoded as:
450+
//
451+
// [type][length][payload]
452+
//
453+
// where:
454+
// - type is a one-byte identifier indicating the metadata subtype,
455+
// - length is a length-encoded integer representing the size of payload in bytes,
456+
// - payload is the raw metadata content for that subtype.
457+
//
458+
// The returned byte slice contains the fully encoded metadata block and is suitable
459+
// for concatenation with other optional metadata blocks when building the
460+
// optional_metadata section of a binlog event.
461+
func buildOptMetaBlock(typ byte, payload []byte) []byte {
462+
var b []byte
463+
b = append(b, typ)
464+
b = append(b, encodeLenEncInt(uint64(len(payload)))...)
465+
b = append(b, payload...)
466+
return b
467+
}
468+
469+
// encodeLenEncInt encodes MySQL "length-encoded integer" (a.k.a. lenenc-int).
470+
//
471+
// Encoding:
472+
// - < 251: 1 byte
473+
// - < 2^16: 0xFC + 2 bytes little-endian
474+
// - < 2^24: 0xFD + 3 bytes little-endian
475+
// - else: 0xFE + 8 bytes little-endian
476+
func encodeLenEncInt(x uint64) []byte {
477+
switch {
478+
case x < 251:
479+
return []byte{byte(x)}
480+
case x < 1<<16:
481+
b := make([]byte, 3)
482+
b[0] = 0xFC
483+
binary.LittleEndian.PutUint16(b[1:], uint16(x))
484+
return b
485+
case x < 1<<24:
486+
// 0xFD + 3 bytes little endian
487+
return []byte{0xFD, byte(x), byte(x >> 8), byte(x >> 16)}
488+
default:
489+
b := make([]byte, 9)
490+
b[0] = 0xFE
491+
binary.LittleEndian.PutUint64(b[1:], x)
492+
return b
493+
}
494+
}
495+
496+
// NewTableMapEvent returns a TableMap event. If any errors are encountered while building the
497+
// event bytes, an error is returned.
368498
// Only works with post_header_length=8.
369-
func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) BinlogEvent {
499+
func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm *TableMap) (BinlogEvent, error) {
500+
optionalMetadata, err := buildOptionalTableMapMetadata(tm)
501+
if err != nil {
502+
return nil, err
503+
}
504+
370505
if f.HeaderSize(eTableMapEvent) != 8 {
371506
panic("Not implemented, post_header_length!=8")
372507
}
@@ -385,7 +520,9 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
385520
len(tm.Types) +
386521
lenEncIntSize(uint64(metadataLength)) + // lenenc-str column-meta-def
387522
metadataLength +
388-
len(tm.CanBeNull.data)
523+
len(tm.CanBeNull.data) +
524+
len(optionalMetadata)
525+
389526
data := make([]byte, length)
390527

391528
data[0] = byte(tableID)
@@ -397,9 +534,11 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
397534
data[6] = byte(tm.Flags)
398535
data[7] = byte(tm.Flags >> 8)
399536
data[8] = byte(len(tm.Database))
537+
400538
pos := 6 + 2 + 1 + copy(data[9:], tm.Database)
401539
data[pos] = 0
402540
pos++
541+
403542
data[pos] = byte(len(tm.Name))
404543
pos += 1 + copy(data[pos+1:], tm.Name)
405544
data[pos] = 0
@@ -414,12 +553,15 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
414553
}
415554

416555
pos += copy(data[pos:], tm.CanBeNull.data)
556+
pos += copy(data[pos:], optionalMetadata)
557+
417558
if pos != len(data) {
418-
panic("bad encoding")
559+
return nil, fmt.Errorf("bad table map encoding; calculated position (%v) "+
560+
"does not match length of data (%v)", pos, len(data))
419561
}
420562

421563
ev := packetize(f, eTableMapEvent, 0, data, m)
422-
return NewMariadbBinlogEvent(ev)
564+
return NewMariadbBinlogEvent(ev), nil
423565
}
424566

425567
// NewWriteRowsEvent returns a WriteRows event. Uses v2.

go/mysql/binlog_event_make_test.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,15 +326,16 @@ func TestTableMapEvent(t *testing.T) {
326326
tm.CanBeNull.Set(5, true)
327327
tm.CanBeNull.Set(9, true)
328328

329-
event := NewTableMapEvent(f, m, 0x102030405060, tm)
329+
event, err := NewTableMapEvent(f, m, 0x102030405060, tm)
330+
require.NoError(t, err)
330331
if !event.IsValid() {
331332
t.Fatalf("NewTableMapEvent().IsValid() is false")
332333
}
333334
if !event.IsTableMap() {
334335
t.Fatalf("NewTableMapEvent().IsTableMap() if false")
335336
}
336337

337-
event, _, err := event.StripChecksum(f)
338+
event, _, err = event.StripChecksum(f)
338339
if err != nil {
339340
t.Fatalf("StripChecksum failed: %v", err)
340341
}
@@ -352,6 +353,71 @@ func TestTableMapEvent(t *testing.T) {
352353
}
353354
}
354355

356+
// Test serialization of TableMap events that contain optional metadata (e.g. column names, enum values).
357+
func TestTableMapEventWithOptionalMetadata(t *testing.T) {
358+
f := NewMySQL56BinlogFormat()
359+
m := NewTestBinlogMetadata()
360+
361+
tm := &TableMap{
362+
Flags: 0x8090,
363+
Database: "my_database",
364+
Name: "my_table",
365+
Types: []byte{
366+
TypeLongLong,
367+
TypeLongLong,
368+
TypeLongLong,
369+
},
370+
CanBeNull: NewServerBitmap(10),
371+
Metadata: []uint16{
372+
0,
373+
0,
374+
0,
375+
},
376+
OptionalEnumValues: [][]string{{"apple", "orange"}, {"red", "green"}},
377+
OptionalSetValues: [][]string{{"one", "two", "three"}},
378+
OptionalColumnNames: []string{"foo", "bar", "baz"},
379+
OptionalColumnCollations: []uint64{0, 0, 0},
380+
OptionalEnumAndSetCollations: []uint64{0, 0, 0},
381+
}
382+
tm.CanBeNull.Set(1, true)
383+
tm.CanBeNull.Set(2, true)
384+
tm.CanBeNull.Set(5, true)
385+
tm.CanBeNull.Set(9, true)
386+
387+
event, err := NewTableMapEvent(f, m, 0x102030405060, tm)
388+
require.NoError(t, err)
389+
if !event.IsValid() {
390+
t.Fatalf("NewTableMapEvent().IsValid() is false")
391+
}
392+
if !event.IsTableMap() {
393+
t.Fatalf("NewTableMapEvent().IsTableMap() if false")
394+
}
395+
396+
event, _, err = event.StripChecksum(f)
397+
if err != nil {
398+
t.Fatalf("StripChecksum failed: %v", err)
399+
}
400+
401+
tableID := event.TableID(f)
402+
if tableID != 0x102030405060 {
403+
t.Fatalf("NewTableMapEvent().TableID returned %x", tableID)
404+
}
405+
406+
// NOTE: Vitess doesn't currently include support for deserializing optional table map data (only serializing it)
407+
// so instead of doing round-trip testing of the values, we use static expected bytes that we know have been
408+
// serialized and work correctly with replication clients.
409+
var expectedBytes = []byte{
410+
0x98, 0x68, 0xe9, 0x53, 0x13, 0x1, 0x0, 0x0, 0x0, 0x81, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x60,
411+
0x50, 0x40, 0x30, 0x20, 0x10, 0x90, 0x80, 0xb, 0x6d, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
412+
0x65, 0x0, 0x8, 0x6d, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x0, 0x3, 0x8, 0x8, 0x8, 0x0, 0x26, 0x2,
413+
0x3, 0x3, 0x0, 0x0, 0x0, 0x4, 0xc, 0x3, 0x66, 0x6f, 0x6f, 0x3, 0x62, 0x61, 0x72, 0x3, 0x62, 0x61, 0x7a,
414+
0x6, 0x19, 0x2, 0x5, 0x61, 0x70, 0x70, 0x6c, 0x65, 0x6, 0x6f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x2, 0x3,
415+
0x72, 0x65, 0x64, 0x5, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x5, 0xf, 0x3, 0x3, 0x6f, 0x6e, 0x65, 0x3, 0x74,
416+
0x77, 0x6f, 0x5, 0x74, 0x68, 0x72, 0x65, 0x65, 0xb, 0x3, 0x0, 0x0, 0x0,
417+
}
418+
require.Equal(t, expectedBytes, event.Bytes())
419+
}
420+
355421
func TestRowsEvent(t *testing.T) {
356422
f := NewMySQL56BinlogFormat()
357423
m := NewTestBinlogMetadata()

0 commit comments

Comments
 (0)