@@ -1025,14 +1025,21 @@ func (this *Migrator) iterateChunks() error {
1025
1025
}
1026
1026
1027
1027
func (this * Migrator ) onApplyEventStruct (eventStruct * applyEventStruct ) error {
1028
- if eventStruct .writeFunc != nil {
1029
- if err := this .retryOperation (* eventStruct .writeFunc ); err != nil {
1030
- return log .Errore (err )
1028
+ handleNonDMLEventStruct := func (eventStruct * applyEventStruct ) error {
1029
+ if eventStruct .writeFunc != nil {
1030
+ if err := this .retryOperation (* eventStruct .writeFunc ); err != nil {
1031
+ return log .Errore (err )
1032
+ }
1031
1033
}
1034
+ return nil
1035
+ }
1036
+ if eventStruct .dmlEvent == nil {
1037
+ return handleNonDMLEventStruct (eventStruct )
1032
1038
}
1033
1039
if eventStruct .dmlEvent != nil {
1034
1040
dmlEvents := [](* binlog.BinlogDMLEvent ){}
1035
1041
dmlEvents = append (dmlEvents , eventStruct .dmlEvent )
1042
+ var nonDmlStructToApply * applyEventStruct
1036
1043
1037
1044
availableEvents := len (this .applyEventsQueue )
1038
1045
batchSize := int (atomic .LoadInt64 (& this .migrationContext .DMLBatchSize ))
@@ -1043,6 +1050,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1043
1050
additionalStruct := <- this .applyEventsQueue
1044
1051
if additionalStruct .dmlEvent == nil {
1045
1052
// Not a DML. We don't group this, and we don't batch any further
1053
+ nonDmlStructToApply = additionalStruct
1046
1054
break
1047
1055
}
1048
1056
dmlEvents = append (dmlEvents , additionalStruct .dmlEvent )
@@ -1054,6 +1062,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1054
1062
if err := this .retryOperation (applyEventFunc ); err != nil {
1055
1063
return log .Errore (err )
1056
1064
}
1065
+ if nonDmlStructToApply != nil {
1066
+ // We pulled DML events from the queue, and then we hit a non-DML event. Wait!
1067
+ // We need to handle it!
1068
+ if err := handleNonDMLEventStruct (nonDmlStructToApply ); err != nil {
1069
+ return log .Errore (err )
1070
+ }
1071
+ }
1057
1072
}
1058
1073
return nil
1059
1074
}
0 commit comments