Skip to content

Commit 2eacc78

Browse files
committed
wait until lowWaterMark >= lastCommitted in WaitForTransaction
1 parent 076a806 commit 2eacc78

File tree

1 file changed

+4
-8
lines changed

1 file changed

+4
-8
lines changed

go/logic/coordinator.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type Coordinator struct {
4343
// list of workers
4444
workers []*Worker
4545

46-
// The low water mark. This is the sequence number of the last job that has been committed.
46+
// The low water mark. We maintain that all transactions with
47+
// sequence number <= lowWaterMark have been completed.
4748
lowWaterMark int64
4849

4950
// This is a map of completed jobs by their sequence numbers.
@@ -194,6 +195,7 @@ func (w *Worker) ProcessEvents() error {
194195
if len(dmlEvents) == cap(dmlEvents) {
195196
if err := w.applyDMLEvents(dmlEvents); err != nil {
196197
w.coordinator.migrationContext.Log.Errore(err)
198+
// TODO do something with the err
197199
}
198200
dmlEvents = dmlEvents[:0]
199201
}
@@ -479,10 +481,6 @@ func (c *Coordinator) WaitForTransaction(lastCommitted int64) chan struct{} {
479481
return nil
480482
}
481483

482-
if _, ok := c.completedJobs[lastCommitted]; ok {
483-
return nil
484-
}
485-
486484
waitChannel := make(chan struct{})
487485
c.waitingJobs[lastCommitted] = append(c.waitingJobs[lastCommitted], waitChannel)
488486

@@ -503,8 +501,6 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
503501
c.mu.Lock()
504502
defer c.mu.Unlock()
505503

506-
//c.migrationContext.Log.Infof("Coordinator: Marking job as completed: %d\n", sequenceNumber)
507-
508504
// Mark the job as completed
509505
c.completedJobs[sequenceNumber] = &mysql.BinlogCoordinates{LogPos: logPos, EventSize: eventSize}
510506

@@ -522,7 +518,7 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
522518

523519
// Schedule any jobs that were waiting for this job to complete or for the low watermark
524520
for waitingForSequenceNumber, channels := range c.waitingJobs {
525-
if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber {
521+
if waitingForSequenceNumber <= c.lowWaterMark {
526522
channelsToNotify = append(channelsToNotify, channels...)
527523
delete(c.waitingJobs, waitingForSequenceNumber)
528524
}

0 commit comments

Comments
 (0)