@@ -18,6 +18,7 @@ package mysql
1818
1919import (
2020 "encoding/binary"
21+ "fmt"
2122 "hash/crc32"
2223)
2324
@@ -364,9 +365,143 @@ func NewMySQLGTIDEvent(f BinlogFormat, m BinlogEventMetadata, gtid Mysql56GTID,
364365 return NewMysql56BinlogEvent (ev )
365366}
366367
367- // NewTableMapEvent returns a TableMap event.
368+ // TableMap optional metadata field types. These identifiers are used in the wire protocol to indicate
369+ // different sections of data in the optional table map metadata.
370+ const (
371+ TableMapOptMetaColumnCharset = 0x03
372+ TableMapOptMetaColumnName = 0x04
373+ TableMapOptMetaSetValues = 0x05
374+ TableMapOptMetaEnumValues = 0x06
375+ TableMapOptMetaEnumSetCharset = 0x0b
376+ )
377+
378+ // buildOptionalTableMapMetadata takes the optional metadata (e.g. column names, column collation IDs, enum and
379+ // set names, and enum and set collation IDs) from |tableMap| and encodes it into the binary format needed to
380+ // transmit this metadata to a remote MySQL replica.
381+ func buildOptionalTableMapMetadata (tableMap * TableMap ) ([]byte , error ) {
382+ columnNames := tableMap .OptionalColumnNames
383+ columnCollationIds := tableMap .OptionalColumnCollations
384+
385+ if len (columnNames ) == 0 || len (columnCollationIds ) == 0 {
386+ return nil , nil
387+ }
388+
389+ if len (columnNames ) != len (columnCollationIds ) {
390+ return nil , fmt .Errorf ("len mismatch: %d columnNames vs %d collationIDs" , len (columnNames ), len (columnCollationIds ))
391+ }
392+
393+ // 1) Build COLUMN_CHARSET payload
394+ var charsetPayload []byte
395+ for _ , coll := range columnCollationIds {
396+ charsetPayload = append (charsetPayload , encodeLenEncInt (coll )... )
397+ }
398+
399+ // 2) Build COLUMN_NAME payload
400+ var namePayload []byte
401+ for _ , n := range columnNames {
402+ if len (n ) > 255 {
403+ return nil , fmt .Errorf ("column name too long (>255): %q" , n )
404+ }
405+ namePayload = append (namePayload , byte (len (n )))
406+ namePayload = append (namePayload , []byte (n )... )
407+ }
408+
409+ // 3) Build ENUM VALUES payload
410+ var enumValuesPayload []byte
411+ for _ , enumValues := range tableMap .OptionalEnumValues {
412+ enumValuesPayload = append (enumValuesPayload , encodeLenEncInt (uint64 (len (enumValues )))... )
413+ for _ , enumValue := range enumValues {
414+ enumValuesPayload = append (enumValuesPayload , encodeLenEncInt (uint64 (len (enumValue )))... )
415+ enumValuesPayload = append (enumValuesPayload , []byte (enumValue )... )
416+ }
417+ }
418+
419+ // 4) Build SET VALUES payload
420+ var setValuesPayload []byte
421+ for _ , setValues := range tableMap .OptionalSetValues {
422+ setValuesPayload = append (setValuesPayload , encodeLenEncInt (uint64 (len (setValues )))... )
423+ for _ , setValue := range setValues {
424+ setValuesPayload = append (setValuesPayload , encodeLenEncInt (uint64 (len (setValue )))... )
425+ setValuesPayload = append (setValuesPayload , []byte (setValue )... )
426+ }
427+ }
428+
429+ // 5) Build ENUM/SET CHARSET payload
430+ var enumSetCharsetPayload []byte
431+ for _ , coll := range tableMap .OptionalEnumAndSetCollations {
432+ enumSetCharsetPayload = append (enumSetCharsetPayload , encodeLenEncInt (coll )... )
433+ }
434+
435+ // 6) Wrap each payload as an optional metadata block: [type][lenenc-int][payload]
436+ var out []byte
437+ out = append (out , buildOptMetaBlock (TableMapOptMetaColumnCharset , charsetPayload )... )
438+ out = append (out , buildOptMetaBlock (TableMapOptMetaColumnName , namePayload )... )
439+ out = append (out , buildOptMetaBlock (TableMapOptMetaEnumValues , enumValuesPayload )... )
440+ out = append (out , buildOptMetaBlock (TableMapOptMetaSetValues , setValuesPayload )... )
441+ out = append (out , buildOptMetaBlock (TableMapOptMetaEnumSetCharset , enumSetCharsetPayload )... )
442+
443+ return out , nil
444+ }
445+
446+ // buildOptMetaBlock constructs a single optional metadata block in the MySQL
447+ // binlog row event format.
448+ //
449+ // The block is encoded as:
450+ //
451+ // [type][length][payload]
452+ //
453+ // where:
454+ // - type is a one-byte identifier indicating the metadata subtype,
455+ // - length is a length-encoded integer representing the size of payload in bytes,
456+ // - payload is the raw metadata content for that subtype.
457+ //
458+ // The returned byte slice contains the fully encoded metadata block and is suitable
459+ // for concatenation with other optional metadata blocks when building the
460+ // optional_metadata section of a binlog event.
461+ func buildOptMetaBlock (typ byte , payload []byte ) []byte {
462+ var b []byte
463+ b = append (b , typ )
464+ b = append (b , encodeLenEncInt (uint64 (len (payload )))... )
465+ b = append (b , payload ... )
466+ return b
467+ }
468+
469+ // encodeLenEncInt encodes MySQL "length-encoded integer" (a.k.a. lenenc-int).
470+ //
471+ // Encoding:
472+ // - < 251: 1 byte
473+ // - < 2^16: 0xFC + 2 bytes little-endian
474+ // - < 2^24: 0xFD + 3 bytes little-endian
475+ // - else: 0xFE + 8 bytes little-endian
476+ func encodeLenEncInt (x uint64 ) []byte {
477+ switch {
478+ case x < 251 :
479+ return []byte {byte (x )}
480+ case x < 1 << 16 :
481+ b := make ([]byte , 3 )
482+ b [0 ] = 0xFC
483+ binary .LittleEndian .PutUint16 (b [1 :], uint16 (x ))
484+ return b
485+ case x < 1 << 24 :
486+ // 0xFD + 3 bytes little endian
487+ return []byte {0xFD , byte (x ), byte (x >> 8 ), byte (x >> 16 )}
488+ default :
489+ b := make ([]byte , 9 )
490+ b [0 ] = 0xFE
491+ binary .LittleEndian .PutUint64 (b [1 :], x )
492+ return b
493+ }
494+ }
495+
496+ // NewTableMapEvent returns a TableMap event. If any errors are encountered while building the
497+ // event bytes, an error is returned.
368498// Only works with post_header_length=8.
369- func NewTableMapEvent (f BinlogFormat , m BinlogEventMetadata , tableID uint64 , tm * TableMap ) BinlogEvent {
499+ func NewTableMapEvent (f BinlogFormat , m BinlogEventMetadata , tableID uint64 , tm * TableMap ) (BinlogEvent , error ) {
500+ optionalMetadata , err := buildOptionalTableMapMetadata (tm )
501+ if err != nil {
502+ return nil , err
503+ }
504+
370505 if f .HeaderSize (eTableMapEvent ) != 8 {
371506 panic ("Not implemented, post_header_length!=8" )
372507 }
@@ -385,7 +520,9 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
385520 len (tm .Types ) +
386521 lenEncIntSize (uint64 (metadataLength )) + // lenenc-str column-meta-def
387522 metadataLength +
388- len (tm .CanBeNull .data )
523+ len (tm .CanBeNull .data ) +
524+ len (optionalMetadata )
525+
389526 data := make ([]byte , length )
390527
391528 data [0 ] = byte (tableID )
@@ -397,9 +534,11 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
397534 data [6 ] = byte (tm .Flags )
398535 data [7 ] = byte (tm .Flags >> 8 )
399536 data [8 ] = byte (len (tm .Database ))
537+
400538 pos := 6 + 2 + 1 + copy (data [9 :], tm .Database )
401539 data [pos ] = 0
402540 pos ++
541+
403542 data [pos ] = byte (len (tm .Name ))
404543 pos += 1 + copy (data [pos + 1 :], tm .Name )
405544 data [pos ] = 0
@@ -414,12 +553,15 @@ func NewTableMapEvent(f BinlogFormat, m BinlogEventMetadata, tableID uint64, tm
414553 }
415554
416555 pos += copy (data [pos :], tm .CanBeNull .data )
556+ pos += copy (data [pos :], optionalMetadata )
557+
417558 if pos != len (data ) {
418- panic ("bad encoding" )
559+ return nil , fmt .Errorf ("bad table map encoding; calculated position (%v) " +
560+ "does not match length of data (%v)" , pos , len (data ))
419561 }
420562
421563 ev := packetize (f , eTableMapEvent , 0 , data , m )
422- return NewMariadbBinlogEvent (ev )
564+ return NewMariadbBinlogEvent (ev ), nil
423565}
424566
425567// NewWriteRowsEvent returns a WriteRows event. Uses v2.
0 commit comments