Skip to content

Commit 0ab8341

Browse files
committed
syncslice, too
1 parent cd76858 commit 0ab8341

File tree

3 files changed

+333
-14
lines changed

3 files changed

+333
-14
lines changed

internal/verifier/compare.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/10gen/migration-verifier/internal/types"
99
"github.com/10gen/migration-verifier/syncmap"
10+
"github.com/10gen/migration-verifier/syncslice"
1011
"github.com/pkg/errors"
1112
"go.mongodb.org/mongo-driver/bson"
1213
"go.mongodb.org/mongo-driver/mongo"
@@ -66,9 +67,9 @@ func (verifier *Verifier) compareDocsFromChannels(
6667
types.ByteCount,
6768
error,
6869
) {
69-
results := []VerificationResult{}
70-
var docCount types.DocumentCount
71-
var byteCount types.ByteCount
70+
results := syncslice.New[VerificationResult]()
71+
var srcDocCount types.DocumentCount
72+
var srcByteCount types.ByteCount
7273

7374
mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
7475
mapKeyFieldNames[0] = "_id"
@@ -131,7 +132,7 @@ func (verifier *Verifier) compareDocsFromChannels(
131132
return errors.Wrap(err, "failed to compare documents")
132133
}
133134

134-
results = append(results, mismatches...)
135+
results = results.Push(mismatches...)
135136

136137
return nil
137138
}
@@ -166,8 +167,8 @@ func (verifier *Verifier) compareDocsFromChannels(
166167
srcClosed = true
167168
break
168169
}
169-
docCount++
170-
byteCount += types.ByteCount(len(doc))
170+
srcDocCount++
171+
srcByteCount += types.ByteCount(len(doc))
171172

172173
err := handleNewDoc(doc, true)
173174

@@ -229,12 +230,8 @@ func (verifier *Verifier) compareDocsFromChannels(
229230
// At this point, any documents left in the cache maps are simply
230231
// missing on the other side. We add results for those.
231232

232-
// We might as well pre-grow the slice:
233-
results = slices.Grow(results, srcCache.Len()+dstCache.Len())
234-
235233
srcCache.Range(func(_ string, doc bson.Raw) bool {
236-
results = append(
237-
results,
234+
results = results.Push(
238235
VerificationResult{
239236
ID: doc.Lookup("_id"),
240237
Details: Missing,
@@ -248,8 +245,7 @@ func (verifier *Verifier) compareDocsFromChannels(
248245
})
249246

250247
dstCache.Range(func(_ string, doc bson.Raw) bool {
251-
results = append(
252-
results,
248+
results = results.Push(
253249
VerificationResult{
254250
ID: doc.Lookup("_id"),
255251
Details: Missing,
@@ -262,7 +258,7 @@ func (verifier *Verifier) compareDocsFromChannels(
262258
return true
263259
})
264260

265-
return results, docCount, byteCount, nil
261+
return results.Get(), srcDocCount, srcByteCount, nil
266262
}
267263

268264
func simpleTimerReset(t *time.Timer, dur time.Duration) {

syncslice/syncslice.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package syncslice
2+
3+
import "sync"
4+
5+
// SyncSlice implements a generic slice along with a concurrency control.
6+
// It lets you manipulate a slice across goroutines easily & safely.
7+
type SyncSlice[T any] struct {
8+
theSlice []T
9+
mutex sync.RWMutex
10+
}
11+
12+
// New creates and populates a new SyncSlice.
13+
func New[T any](els ...T) *SyncSlice[T] {
14+
return &SyncSlice[T]{
15+
theSlice: els,
16+
}
17+
}
18+
19+
// ----------------------------------------------------------------------
20+
21+
// Get returns a copy of the raw underlying slice.
22+
func (ss *SyncSlice[T]) Get() []T {
23+
ss.mutex.RLock()
24+
defer ss.mutex.RUnlock()
25+
26+
return ss.theSlice
27+
}
28+
29+
// Len returns the number of elements in the underlying slice.
30+
func (ss *SyncSlice[T]) Len() int {
31+
ss.mutex.RLock()
32+
defer ss.mutex.RUnlock()
33+
34+
return len(ss.theSlice)
35+
}
36+
37+
// ----------------------------------------------------------------------
38+
39+
// Push appends 0 or more elements.
40+
func (ss *SyncSlice[T]) Push(els ...T) *SyncSlice[T] {
41+
ss.mutex.Lock()
42+
defer ss.mutex.Unlock()
43+
44+
ss.theSlice = append(ss.theSlice, els...)
45+
46+
return ss
47+
}
48+
49+
// Pop removes the slice’s last item and returns it.
50+
// This will panic if the slice is empty.
51+
func (ss *SyncSlice[T]) Pop() T {
52+
ss.mutex.Lock()
53+
defer ss.mutex.Unlock()
54+
55+
popped := ss.theSlice[len(ss.theSlice)-1]
56+
57+
ss.theSlice = ss.theSlice[:len(ss.theSlice)-1]
58+
59+
return popped
60+
}
61+
62+
// Slice removes the slice’s first item and returns it.
63+
// This will panic if the slice is empty.
64+
func (ss *SyncSlice[T]) Shift() T {
65+
ss.mutex.Lock()
66+
defer ss.mutex.Unlock()
67+
68+
shifted := ss.theSlice[0]
69+
ss.theSlice = ss.theSlice[1:]
70+
71+
return shifted
72+
}
73+
74+
// Unshift inserts 0 or more elements at the start of the slice.
75+
func (ss *SyncSlice[T]) Unshift(els ...T) *SyncSlice[T] {
76+
ss.mutex.Lock()
77+
defer ss.mutex.Unlock()
78+
79+
ss.theSlice = append(els, ss.theSlice...)
80+
81+
return ss
82+
}
83+
84+
// Update updates the slice using the given callback.
85+
func (ss *SyncSlice[T]) Update(cb func([]T) []T) {
86+
ss.mutex.Lock()
87+
defer ss.mutex.Unlock()
88+
89+
ss.theSlice = cb(ss.theSlice)
90+
}

syncslice/unit_test.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package syncslice
2+
3+
import (
4+
"strings"
5+
"sync"
6+
"testing"
7+
8+
"github.com/10gen/mongosync/mongo-go/goroutine"
9+
mslices "github.com/10gen/mongosync/mongo-go/slices"
10+
"github.com/stretchr/testify/suite"
11+
)
12+
13+
type mySuite struct {
14+
suite.Suite
15+
}
16+
17+
func TestUnitTestSuite(t *testing.T) {
18+
suite.Run(t, &mySuite{})
19+
}
20+
21+
func (s *mySuite) TestSyncSlice() {
22+
slice := New("123")
23+
24+
gotten := slice.Get()
25+
length := 0
26+
27+
s.Run(
28+
"Push",
29+
func() {
30+
wg := sync.WaitGroup{}
31+
wg.Add(3)
32+
33+
goroutine.Go(func() {
34+
defer wg.Done()
35+
gotten = slice.Get()
36+
})
37+
38+
goroutine.Go(func() {
39+
defer wg.Done()
40+
length = slice.Len()
41+
})
42+
43+
goroutine.Go(func() {
44+
defer wg.Done()
45+
s.Assert().Equal(
46+
slice,
47+
slice.Push("234"),
48+
)
49+
})
50+
51+
wg.Wait()
52+
53+
s.Assert().Contains(
54+
mslices.Of("123", "123 234"),
55+
strings.Join(gotten, " "),
56+
)
57+
58+
s.Assert().Contains(
59+
mslices.Of(1, 2),
60+
length,
61+
)
62+
},
63+
)
64+
65+
s.Run(
66+
"Pop",
67+
func() {
68+
wg := sync.WaitGroup{}
69+
wg.Add(3)
70+
71+
goroutine.Go(func() {
72+
defer wg.Done()
73+
gotten = slice.Get()
74+
})
75+
76+
goroutine.Go(func() {
77+
defer wg.Done()
78+
length = slice.Len()
79+
})
80+
81+
goroutine.Go(func() {
82+
defer wg.Done()
83+
s.Assert().Equal(
84+
"234",
85+
slice.Pop(),
86+
)
87+
})
88+
89+
wg.Wait()
90+
91+
s.Assert().Contains(
92+
mslices.Of("123", "123 234"),
93+
strings.Join(gotten, " "),
94+
)
95+
96+
s.Assert().Contains(
97+
mslices.Of(1, 2),
98+
length,
99+
)
100+
},
101+
)
102+
103+
s.Run(
104+
"Unshift",
105+
func() {
106+
wg := sync.WaitGroup{}
107+
wg.Add(3)
108+
109+
goroutine.Go(func() {
110+
defer wg.Done()
111+
gotten = slice.Get()
112+
})
113+
114+
goroutine.Go(func() {
115+
defer wg.Done()
116+
length = slice.Len()
117+
})
118+
119+
goroutine.Go(func() {
120+
defer wg.Done()
121+
s.Assert().Equal(
122+
slice,
123+
slice.Unshift("234"),
124+
)
125+
})
126+
127+
wg.Wait()
128+
129+
s.Assert().Contains(
130+
mslices.Of("123", "234 123"),
131+
strings.Join(gotten, " "),
132+
)
133+
134+
s.Assert().Contains(
135+
mslices.Of(1, 2),
136+
length,
137+
)
138+
},
139+
)
140+
141+
s.Run(
142+
"Shift",
143+
func() {
144+
wg := sync.WaitGroup{}
145+
wg.Add(3)
146+
147+
goroutine.Go(func() {
148+
defer wg.Done()
149+
gotten = slice.Get()
150+
})
151+
152+
goroutine.Go(func() {
153+
defer wg.Done()
154+
length = slice.Len()
155+
})
156+
157+
goroutine.Go(func() {
158+
defer wg.Done()
159+
s.Assert().Equal(
160+
"234",
161+
slice.Shift(),
162+
)
163+
})
164+
165+
wg.Wait()
166+
167+
s.Assert().Contains(
168+
mslices.Of("123", "234 123"),
169+
strings.Join(gotten, " "),
170+
)
171+
172+
s.Assert().Contains(
173+
mslices.Of(1, 2),
174+
length,
175+
)
176+
},
177+
)
178+
179+
s.Run(
180+
"Update",
181+
func() {
182+
wg := sync.WaitGroup{}
183+
wg.Add(2)
184+
185+
var length int
186+
var gotten []string
187+
188+
goroutine.Go(func() {
189+
defer wg.Done()
190+
slice.Update(func(arr []string) []string {
191+
length = len(arr)
192+
gotten = arr
193+
return []string{
194+
"345",
195+
"456",
196+
"567",
197+
"678",
198+
}
199+
})
200+
})
201+
202+
goroutine.Go(func() {
203+
defer wg.Done()
204+
slice.Update(func(arr []string) []string {
205+
length = len(arr)
206+
gotten = arr
207+
return []string{
208+
"789",
209+
"891",
210+
"912",
211+
}
212+
})
213+
})
214+
215+
wg.Wait()
216+
217+
switch length {
218+
case 3:
219+
s.Assert().Contains(
220+
mslices.Of("234 123", "789 891 912"),
221+
strings.Join(gotten, " "),
222+
)
223+
case 4:
224+
s.Assert().Contains(
225+
mslices.Of("234 123", "345 456 567 678"),
226+
strings.Join(gotten, " "),
227+
)
228+
default:
229+
s.Assert().Fail("expected 3 or 4 elements")
230+
}
231+
},
232+
)
233+
}

0 commit comments

Comments
 (0)