@@ -24,7 +24,7 @@ func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) {
24
24
var count int
25
25
for i , c := range cids {
26
26
select {
27
- case dequeued , ok := <- q .Dequeue ():
27
+ case dequeued , ok := <- q .Out ():
28
28
if ! ok {
29
29
t .Fatal ("queue closed" )
30
30
}
@@ -53,29 +53,52 @@ func TestBasicOperation(t *testing.T) {
53
53
t .Fatal ("wrong queue name" )
54
54
}
55
55
56
- queue .Enqueue (nil )
56
+ queue .Put (nil )
57
57
select {
58
- case <- queue .Dequeue ():
58
+ case <- queue .Out ():
59
59
t .Fatal ("nothing should be in queue" )
60
60
case <- time .After (time .Millisecond ):
61
61
}
62
62
63
- cids := random .Cids (10 )
64
- for _ , c := range cids {
65
- queue .Enqueue (c .Bytes ())
66
- }
63
+ out := make (chan []string )
64
+ go func () {
65
+ var outStrs []string
66
+ for {
67
+ select {
68
+ case dq , open := <- queue .Out ():
69
+ if ! open {
70
+ out <- outStrs
71
+ return
72
+ }
73
+ dqItem := string (dq )
74
+ t .Log ("got:" , dqItem )
75
+ outStrs = append (outStrs , dqItem )
76
+ }
77
+ }
78
+ }()
67
79
68
- assertOrdered (cids , queue , t )
80
+ items := []string {"apple" , "banana" , "cherry" }
81
+ for _ , item := range items {
82
+ queue .Put ([]byte (item ))
83
+ }
69
84
85
+ time .Sleep (time .Second )
70
86
err := queue .Close ()
71
87
if err != nil {
72
88
t .Fatal (err )
73
89
}
90
+
91
+ qout := <- out
92
+
93
+ if len (qout ) != len (items ) {
94
+ t .Fatalf ("dequeued wrond number of items, expected %d, got %d" , len (items ), len (qout ))
95
+ }
96
+
74
97
if err = queue .Close (); err != nil {
75
98
t .Fatal (err )
76
99
}
77
100
78
- err = queue .Enqueue ( cids [ 0 ]. Bytes ( ))
101
+ err = queue .Put ([] byte ( items [ 0 ] ))
79
102
if err == nil {
80
103
t .Fatal ("expected error calling Enqueue after Close" )
81
104
}
@@ -98,7 +121,7 @@ func TestMangledData(t *testing.T) {
98
121
99
122
cids := random .Cids (10 )
100
123
for _ , c := range cids {
101
- queue .Enqueue (c .Bytes ())
124
+ queue .Put (c .Bytes ())
102
125
}
103
126
104
127
// expect to only see the valid cids we entered
@@ -113,7 +136,7 @@ func TestInitialization(t *testing.T) {
113
136
114
137
cids := random .Cids (10 )
115
138
for _ , c := range cids {
116
- queue .Enqueue (c .Bytes ())
139
+ queue .Put (c .Bytes ())
117
140
}
118
141
119
142
assertOrdered (cids [:5 ], queue , t )
@@ -137,7 +160,7 @@ func TestIdleFlush(t *testing.T) {
137
160
138
161
cids := random .Cids (10 )
139
162
for _ , c := range cids {
140
- queue .Enqueue (c .Bytes ())
163
+ queue .Put (c .Bytes ())
141
164
}
142
165
143
166
dsn := namespace .Wrap (ds , datastore .NewKey ("/dsq-" + dsqName ))
@@ -195,7 +218,7 @@ func TestPersistManyCids(t *testing.T) {
195
218
196
219
cids := random .Cids (25 )
197
220
for _ , c := range cids {
198
- queue .Enqueue (c .Bytes ())
221
+ queue .Put (c .Bytes ())
199
222
}
200
223
201
224
err := queue .Close ()
@@ -216,7 +239,7 @@ func TestPersistOneCid(t *testing.T) {
216
239
defer queue .Close ()
217
240
218
241
cids := random .Cids (1 )
219
- queue .Enqueue (cids [0 ].Bytes ())
242
+ queue .Put (cids [0 ].Bytes ())
220
243
221
244
err := queue .Close ()
222
245
if err != nil {
@@ -236,14 +259,14 @@ func TestDeduplicateCids(t *testing.T) {
236
259
defer queue .Close ()
237
260
238
261
cids := random .Cids (5 )
239
- queue .Enqueue (cids [0 ].Bytes ())
240
- queue .Enqueue (cids [0 ].Bytes ())
241
- queue .Enqueue (cids [1 ].Bytes ())
242
- queue .Enqueue (cids [2 ].Bytes ())
243
- queue .Enqueue (cids [1 ].Bytes ())
244
- queue .Enqueue (cids [3 ].Bytes ())
245
- queue .Enqueue (cids [0 ].Bytes ())
246
- queue .Enqueue (cids [4 ].Bytes ())
262
+ queue .Put (cids [0 ].Bytes ())
263
+ queue .Put (cids [0 ].Bytes ())
264
+ queue .Put (cids [1 ].Bytes ())
265
+ queue .Put (cids [2 ].Bytes ())
266
+ queue .Put (cids [1 ].Bytes ())
267
+ queue .Put (cids [3 ].Bytes ())
268
+ queue .Put (cids [0 ].Bytes ())
269
+ queue .Put (cids [4 ].Bytes ())
247
270
248
271
assertOrdered (cids , queue , t )
249
272
@@ -253,7 +276,7 @@ func TestDeduplicateCids(t *testing.T) {
253
276
254
277
cids = append (cids , cids [0 ], cids [0 ], cids [1 ])
255
278
for _ , c := range cids {
256
- queue .Enqueue (c .Bytes ())
279
+ queue .Put (c .Bytes ())
257
280
}
258
281
assertOrdered (cids , queue , t )
259
282
}
@@ -266,7 +289,7 @@ func TestClear(t *testing.T) {
266
289
defer queue .Close ()
267
290
268
291
for _ , c := range random .Cids (cidCount ) {
269
- queue .Enqueue (c .Bytes ())
292
+ queue .Put (c .Bytes ())
270
293
}
271
294
272
295
// Cause queued entried to be saved in datastore.
@@ -279,7 +302,7 @@ func TestClear(t *testing.T) {
279
302
defer queue .Close ()
280
303
281
304
for _ , c := range random .Cids (cidCount ) {
282
- queue .Enqueue (c .Bytes ())
305
+ queue .Put (c .Bytes ())
283
306
}
284
307
285
308
rmCount := queue .Clear ()
@@ -297,7 +320,7 @@ func TestClear(t *testing.T) {
297
320
defer queue .Close ()
298
321
299
322
select {
300
- case <- queue .Dequeue ():
323
+ case <- queue .Out ():
301
324
t .Fatal ("dequeue should not return" )
302
325
case <- time .After (10 * time .Millisecond ):
303
326
}
@@ -314,7 +337,7 @@ func TestCloseTimeout(t *testing.T) {
314
337
315
338
cids := random .Cids (5 )
316
339
for _ , c := range cids {
317
- queue .Enqueue (c .Bytes ())
340
+ queue .Put (c .Bytes ())
318
341
}
319
342
320
343
err := queue .Close ()
@@ -331,7 +354,7 @@ func TestCloseTimeout(t *testing.T) {
331
354
defer queue .Close ()
332
355
333
356
for _ , c := range cids {
334
- queue .Enqueue (c .Bytes ())
357
+ queue .Put (c .Bytes ())
335
358
}
336
359
if err = queue .Close (); err != nil {
337
360
t .Fatal (err )
0 commit comments