Skip to content

Commit 645af21

Browse files
author
Shlomi Noach
committed
extracted onApplyEventStruct()
1 parent 8d0faa5 commit 645af21

File tree

1 file changed

+36
-31
lines changed

1 file changed

+36
-31
lines changed

go/logic/migrator.go

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,40 @@ func (this *Migrator) iterateChunks() error {
10241024
return nil
10251025
}
10261026

1027+
func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1028+
if eventStruct.writeFunc != nil {
1029+
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
1030+
return log.Errore(err)
1031+
}
1032+
}
1033+
if eventStruct.dmlEvent != nil {
1034+
dmlEvents := [](*binlog.BinlogDMLEvent){}
1035+
dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
1036+
1037+
availableEvents := len(this.applyEventsQueue)
1038+
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
1039+
if availableEvents > batchSize {
1040+
availableEvents = batchSize
1041+
}
1042+
for i := 0; i < availableEvents; i++ {
1043+
additionalStruct := <-this.applyEventsQueue
1044+
if additionalStruct.dmlEvent == nil {
1045+
// Not a DML. We don't group this, and we don't batch any further
1046+
break
1047+
}
1048+
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
1049+
}
1050+
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1051+
var applyEventFunc tableWriteFunc = func() error {
1052+
return this.applier.ApplyDMLEventQueries(dmlEvents)
1053+
}
1054+
if err := this.retryOperation(applyEventFunc); err != nil {
1055+
return log.Errore(err)
1056+
}
1057+
}
1058+
return nil
1059+
}
1060+
10271061
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
10281062
// This is where the ghost table gets the data. The function fills the data single-threaded.
10291063
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
@@ -1038,38 +1072,9 @@ func (this *Migrator) executeWriteFuncs() error {
10381072
// We give higher priority to event processing, then secondary priority to
10391073
// rowcopy
10401074
select {
1041-
case applyEventStruct := <-this.applyEventsQueue:
1075+
case eventStruct := <-this.applyEventsQueue:
10421076
{
1043-
if applyEventStruct.writeFunc != nil {
1044-
if err := this.retryOperation(*applyEventStruct.writeFunc); err != nil {
1045-
return log.Errore(err)
1046-
}
1047-
}
1048-
if applyEventStruct.dmlEvent != nil {
1049-
dmlEvents := [](*binlog.BinlogDMLEvent){}
1050-
dmlEvents = append(dmlEvents, applyEventStruct.dmlEvent)
1051-
1052-
availableEvents := len(this.applyEventsQueue)
1053-
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
1054-
if availableEvents > batchSize {
1055-
availableEvents = batchSize
1056-
}
1057-
for i := 0; i < availableEvents; i++ {
1058-
additionalStruct := <-this.applyEventsQueue
1059-
if additionalStruct.dmlEvent == nil {
1060-
// Not a DML. We don't group this, and we don't batch any further
1061-
break
1062-
}
1063-
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
1064-
}
1065-
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1066-
var applyEventFunc tableWriteFunc = func() error {
1067-
return this.applier.ApplyDMLEventQueries(dmlEvents)
1068-
}
1069-
if err := this.retryOperation(applyEventFunc); err != nil {
1070-
return log.Errore(err)
1071-
}
1072-
}
1077+
this.onApplyEventStruct(eventStruct)
10731078
}
10741079
default:
10751080
{

0 commit comments

Comments
 (0)