@@ -11,6 +11,7 @@ import (
11
11
"fmt"
12
12
"io"
13
13
"math/rand"
14
+ "strings"
14
15
"sync/atomic"
15
16
"testing"
16
17
"time"
@@ -30,12 +31,12 @@ import (
30
31
// or corrupting data.
31
32
//
32
33
// TODO(jeffswenson): use this to test GCS and Azure
33
- // TODO(jeffswenson): add a list operation
34
34
func RunCloudNemesisTest (t * testing.T , storage cloud.ExternalStorage ) {
35
35
nemesis := & cloudNemesis {
36
36
storage : storage ,
37
37
writeConcurrency : 1 ,
38
38
readConcurrency : 40 ,
39
+ listConcurrency : 1 ,
39
40
}
40
41
41
42
// We create a context here because we don't want to support a caller supplied
@@ -47,15 +48,18 @@ func RunCloudNemesisTest(t *testing.T, storage cloud.ExternalStorage) {
47
48
48
49
require .Greater (t , nemesis .writeSuccesses .Load (), int64 (5 ), "not enough completed writes" )
49
50
require .Greater (t , nemesis .readSuccesses .Load (), int64 (200 ), "not enough completed reads" )
51
+ require .Greater (t , nemesis .listSuccesses .Load (), int64 (5 ), "not enough completed lists" )
50
52
}
51
53
52
54
type cloudNemesis struct {
53
55
storage cloud.ExternalStorage
54
56
writeConcurrency int
55
57
readConcurrency int
58
+ listConcurrency int
56
59
57
60
readSuccesses atomic.Int64
58
61
writeSuccesses atomic.Int64
62
+ listSuccesses atomic.Int64
59
63
60
64
mu struct {
61
65
syncutil.Mutex
@@ -118,6 +122,22 @@ func (c *cloudNemesis) run(ctx context.Context, duration time.Duration) error {
118
122
})
119
123
}
120
124
125
+ for i := 0 ; i < c .listConcurrency ; i ++ {
126
+ g .Go (func () error {
127
+ for {
128
+ time .Sleep (time .Millisecond )
129
+ select {
130
+ case <- done :
131
+ return nil
132
+ default :
133
+ if err := c .listObjects (ctx ); err != nil {
134
+ return err
135
+ }
136
+ }
137
+ }
138
+ })
139
+ }
140
+
121
141
// We shouldn't observe any errors from the client. We are injecting errors
122
142
// that should be transparently retried.
123
143
return g .Wait ()
@@ -213,6 +233,42 @@ func (c *cloudNemesis) readObject(ctx context.Context) (err error) {
213
233
return nil
214
234
}
215
235
236
+ func (c * cloudNemesis ) listObjects (ctx context.Context ) (err error ) {
237
+ before := c .snapshotObjects ()
238
+
239
+ listedFiles := map [string ]bool {}
240
+ err = c .storage .List (ctx , "" , "" , func (filename string ) error {
241
+ listedFiles [strings .TrimPrefix (filename , "/" )] = true
242
+ return nil
243
+ })
244
+ if err != nil {
245
+ return errors .Wrap (err , "unable to list files" )
246
+ }
247
+
248
+ // Check if there are any missing files in the listing.
249
+ for _ , o := range before {
250
+ if o .finished {
251
+ if ! listedFiles [o .name ] {
252
+ return errors .AssertionFailedf ("expected to find object %s in listing" , o .name )
253
+ }
254
+ }
255
+ }
256
+
257
+ // Check if there are any unexpected files in the listing.
258
+ afterFiles := map [string ]bool {}
259
+ for _ , o := range c .snapshotObjects () {
260
+ afterFiles [o .name ] = true
261
+ }
262
+ for filename := range listedFiles {
263
+ if ! afterFiles [filename ] {
264
+ return errors .AssertionFailedf ("found unexpected object %s in listing" , filename )
265
+ }
266
+ }
267
+
268
+ c .listSuccesses .Add (1 )
269
+ return nil
270
+ }
271
+
216
272
func (c * cloudNemesis ) newObject () cloudObject {
217
273
c .mu .Lock ()
218
274
defer c .mu .Unlock ()
@@ -249,6 +305,15 @@ func (c *cloudNemesis) randomObject() cloudObject {
249
305
return c .mu .objects [rand .Intn (len (c .mu .objects ))]
250
306
}
251
307
308
+ func (c * cloudNemesis ) snapshotObjects () []cloudObject {
309
+ c .mu .Lock ()
310
+ defer c .mu .Unlock ()
311
+
312
+ snapshot := make ([]cloudObject , len (c .mu .objects ))
313
+ copy (snapshot , c .mu .objects )
314
+ return snapshot
315
+ }
316
+
252
317
// generatedObject is a deterministic implementation of io.Reader.
253
318
type generatedObject struct {
254
319
seed int64
0 commit comments