Skip to content

Commit 61a1761

Browse files
[GH-27] Listener splits big transactions (if it possible) (#30)
* Refactored ExtractOperation method * Implemented Bean interface with methods for split events
1 parent 68c6986 commit 61a1761

File tree

12 files changed

+735
-75
lines changed

12 files changed

+735
-75
lines changed

component/bean/bean.go

Lines changed: 111 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,76 +17,142 @@
1717
package bean
1818

1919
import (
20-
"github.com/th2-net/th2-listener-mysql-binlog-go/component/database"
21-
)
22-
23-
const (
24-
insertOperation Operation = "INSERT"
25-
updateOperation Operation = "UPDATE"
26-
deleteOperation Operation = "DELETE"
20+
"encoding/json"
21+
"strconv"
2722

28-
truncateOperation Operation = "TRUNCATE"
29-
createTableOperation Operation = "CREATE_TABLE"
30-
dropTableOperation Operation = "DROP_TABLE"
31-
alterTableOperation Operation = "ALTER_TABLE"
32-
UnknownOperation Operation = "UNKNOWN"
23+
"github.com/th2-net/th2-listener-mysql-binlog-go/component/database"
3324
)
3425

3526
type Operation string
27+
type Bean interface {
28+
// Returns approximate size (bigger or equal than Serialize method return) for instances where Splittable method returns true.
29+
// Returns 0 where Splittable method returns false.
30+
SizeBytes() int
31+
// Returns serialized representation of instance.
32+
Serialize() ([]byte, error)
33+
// Returns true if the instance can be split.
34+
Splittable() bool
35+
// Returns parts as close as possible to passed size.
36+
Split(size int) []Bean
37+
}
3638

37-
type Values map[string]interface{}
39+
type DataMap map[string]any
40+
type DataSlice []DataMap
3841

3942
type Record struct {
4043
Schema string
4144
Table string
4245
Operation Operation
4346
}
4447

45-
type Insert struct {
46-
Record
47-
Inserted []Values
48-
}
49-
50-
type UpdatePair struct {
51-
Before Values
52-
After Values
53-
}
54-
55-
type Update struct {
56-
Record
57-
Updated []UpdatePair
48+
func (r Record) sizeBytes() int {
49+
size := 2 // {...}
50+
size += 9 + jsonSize(r.Schema) + 1 // "Schema":"...",
51+
size += 8 + jsonSize(r.Table) + 1 // "Table":"...",
52+
size += 12 + jsonSize(r.Operation) + 1 // "Operation":"...",
53+
return size
5854
}
5955

60-
type Delete struct {
61-
Record
62-
Deleted []Values
56+
func (val DataMap) sizeBytes() int {
57+
size := 2 // {...}
58+
size += len(val) - 1 // ...,...
59+
for k, v := range val {
60+
size += jsonSize(k) + 1 + jsonSize(v) // "<k>":"<v>"
61+
}
62+
return size
6363
}
6464

65-
type Query struct {
66-
Record
67-
Query string
65+
func jsonSize(value interface{}) int {
66+
switch val := value.(type) {
67+
case nil:
68+
return 4 // null
69+
case int, int8, int16, int32, int64:
70+
return len(strconv.FormatInt(toInt64(val), 10))
71+
case uint, uint8, uint16, uint32, uint64:
72+
return len(strconv.FormatUint(toUint64(val), 10))
73+
case float32:
74+
return len(strconv.FormatFloat(float64(val), 'g', -1, 32))
75+
case float64:
76+
return len(strconv.FormatFloat(val, 'g', -1, 64))
77+
case string:
78+
return len(strconv.Quote(val))
79+
case Operation:
80+
return len(strconv.Quote(string(val)))
81+
case []byte:
82+
return ((len(val)+2)/3)*4 + 2
83+
default:
84+
b, _ := json.Marshal(val)
85+
return len(b)
86+
}
6887
}
6988

70-
func NewInsert(schema string, table string, fields []string, rows [][]any) Insert {
71-
return Insert{Record: Record{Schema: schema, Table: table, Operation: insertOperation}, Inserted: createValues(fields, rows)}
89+
func toInt64(v any) int64 {
90+
switch vv := v.(type) {
91+
case int:
92+
return int64(vv)
93+
case int8:
94+
return int64(vv)
95+
case int16:
96+
return int64(vv)
97+
case int32:
98+
return int64(vv)
99+
case int64:
100+
return vv
101+
}
102+
return 0
72103
}
73104

74-
func NewUpdate(schema string, table string, fields []string, rows [][]any) Update {
75-
return Update{Record: Record{Schema: schema, Table: table, Operation: updateOperation}, Updated: createUpdatePairs(fields, rows)}
105+
func toUint64(v any) uint64 {
106+
switch vv := v.(type) {
107+
case uint:
108+
return uint64(vv)
109+
case uint8:
110+
return uint64(vv)
111+
case uint16:
112+
return uint64(vv)
113+
case uint32:
114+
return uint64(vv)
115+
case uint64:
116+
return vv
117+
}
118+
return 0
76119
}
77120

78-
func NewDelete(schema string, table string, fields []string, rows [][]any) Delete {
79-
return Delete{Record: Record{Schema: schema, Table: table, Operation: deleteOperation}, Deleted: createValues(fields, rows)}
121+
func (ds DataSlice) sizeBytes() int {
122+
size := len(ds) - 1 // ...,...
123+
for _, val := range ds {
124+
size += val.sizeBytes()
125+
}
126+
return size
80127
}
81128

82-
func NewQuery(schema string, table string, query string, operation Operation) Query {
83-
return Query{Record: Record{Schema: schema, Table: table, Operation: operation}, Query: query}
129+
func (ds DataSlice) split(baseSize int, maxSize int) []DataSlice {
130+
var res []DataSlice
131+
var partSize int
132+
var part DataSlice
133+
for i, val := range ds {
134+
valSize := val.sizeBytes()
135+
if i == 0 {
136+
partSize = baseSize + valSize
137+
part = DataSlice{val}
138+
} else {
139+
if partSize+valSize+1 > maxSize {
140+
res = append(res, part)
141+
partSize = baseSize + valSize
142+
part = DataSlice{val}
143+
} else {
144+
partSize += valSize + 1 // ...,...
145+
part = append(part, val)
146+
}
147+
}
148+
}
149+
return append(res, part)
84150
}
85151

86-
func createValues(tableMetadata database.TableMetadata, rows [][]any) []Values {
87-
result := make([]Values, len(rows))
152+
func createValues(tableMetadata database.TableMetadata, rows [][]any) DataSlice {
153+
result := make(DataSlice, len(rows))
88154
for index, row := range rows {
89-
values := Values{}
155+
values := DataMap{}
90156
result[index] = values
91157
for columnIndex, columnValue := range row {
92158
values[tableMetadata[columnIndex]] = columnValue
@@ -95,11 +161,11 @@ func createValues(tableMetadata database.TableMetadata, rows [][]any) []Values {
95161
return result
96162
}
97163

98-
func createUpdatePairs(tableMetadata database.TableMetadata, rows [][]interface{}) []UpdatePair {
164+
func createUpdatePairs(tableMetadata database.TableMetadata, rows [][]any) []UpdatePair {
99165
result := make([]UpdatePair, len(rows)/2)
100166
var pair UpdatePair = UpdatePair{}
101167
for index, row := range rows {
102-
values := Values{}
168+
values := DataMap{}
103169
for columnIndex, columnValue := range row {
104170
values[tableMetadata[columnIndex]] = columnValue
105171
}

0 commit comments

Comments
 (0)