Skip to content

Commit 00f395b

Browse files
committed
PBM-886: sync channel set/unset/close ops
1 parent c91f62a commit 00f395b

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

pbm/oplog/backup.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"sync"
78
"time"
89

910
"go.mongodb.org/mongo-driver/bson"
@@ -31,6 +32,7 @@ func (t Timeline) String() string {
3132
// OplogBackup is used for reading the Mongodb oplog
3233
type OplogBackup struct {
3334
cl *mongo.Client
35+
mu sync.Mutex
3436
stopC chan struct{}
3537
start primitive.Timestamp
3638
end primitive.Timestamp
@@ -68,7 +70,10 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
6870
return 0, errors.Errorf("oplog TailingSpan should be set, have start: %v, end: %v", ot.start, ot.end)
6971
}
7072

73+
ot.mu.Lock()
7174
ot.stopC = make(chan struct{})
75+
ot.mu.Unlock()
76+
7277
ctx, cancel := context.WithCancel(context.Background())
7378
defer cancel()
7479

@@ -79,7 +84,9 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
7984
cancel()
8085
}
8186

87+
ot.mu.Lock()
8288
ot.stopC = nil
89+
ot.mu.Unlock()
8390
}()
8491

8592
cur, err := ot.cl.Database("local").Collection("oplog.rs").Find(ctx,
@@ -145,6 +152,9 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
145152
}
146153

147154
func (ot *OplogBackup) Cancel() {
155+
ot.mu.Lock()
156+
defer ot.mu.Unlock()
157+
148158
if c := ot.stopC; c != nil {
149159
select {
150160
case _, ok := <-c:

pbm/storage/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func Upload(
214214

215215
err := r.Close()
216216
if err != nil {
217-
return 0, errors.Wrap(err, "cancel backup: close reader")
217+
return 0, errors.Wrap(err, "cancel upload: close reader")
218218
}
219219
return 0, ErrCancelled
220220
case <-saveDone:

0 commit comments

Comments
 (0)