Skip to content

Commit 26e80ef

Browse files
authored
Merge branch 'master' into clear
2 parents 5684eac + 188f2bf commit 26e80ef

File tree

3 files changed

+14
-1
lines changed

3 files changed

+14
-1
lines changed

replication/binlogsyncer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ type BinlogSyncerConfig struct {
134134
// Only works with MariaDB flavor.
135135
FillZeroLogPos bool
136136

137+
// PayloadDecoderConcurrency is used to control concurrency for decoding TransactionPayloadEvent.
138+
// Default 0, this will be set to GOMAXPROCS.
139+
PayloadDecoderConcurrency int
140+
137141
// SynchronousEventHandler is used for synchronous event handling.
138142
// This should not be used together with StartBackupWithHandler.
139143
// If this is not nil, GetEvent does not need to be called.
@@ -208,6 +212,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
208212
b.parser.SetUseDecimal(b.cfg.UseDecimal)
209213
b.parser.SetUseFloatWithTrailingZero(b.cfg.UseFloatWithTrailingZero)
210214
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
215+
b.parser.SetPayloadDecoderConcurrency(cfg.PayloadDecoderConcurrency)
211216
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
212217
b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc)
213218
b.running = false

replication/parser.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type BinlogParser struct {
4040
ignoreJSONDecodeErr bool
4141
verifyChecksum bool
4242

43+
payloadDecoderConcurrency int
44+
4345
rowsEventDecodeFunc func(*RowsEvent, []byte) error
4446

4547
tableMapOptionalMetaDecodeFunc func([]byte) error
@@ -215,6 +217,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
215217
p.flavor = flavor
216218
}
217219

220+
func (p *BinlogParser) SetPayloadDecoderConcurrency(concurrency int) {
221+
p.payloadDecoderConcurrency = concurrency
222+
}
223+
218224
func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
219225
p.rowsEventDecodeFunc = rowsEventDecodeFunc
220226
}
@@ -456,6 +462,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
456462
func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
457463
e := &TransactionPayloadEvent{}
458464
e.format = *p.format
465+
e.concurrency = p.payloadDecoderConcurrency
459466

460467
return e
461468
}

replication/transaction_payload_event.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828

2929
type TransactionPayloadEvent struct {
3030
format FormatDescriptionEvent
31+
concurrency int
3132
Size uint64
3233
UncompressedSize uint64
3334
CompressionType uint64
@@ -103,7 +104,7 @@ func (e *TransactionPayloadEvent) decodePayload() error {
103104
e.CompressionType, e.compressionType())
104105
}
105106

106-
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
107+
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(e.concurrency))
107108
if err != nil {
108109
return err
109110
}

0 commit comments

Comments
 (0)