@@ -131,6 +131,83 @@ func TestDiskQueueRoll(t *testing.T) {
131131 }
132132}
133133
134+ func TestDiskQueuePeek (t * testing.T ) {
135+ l := NewTestLogger (t )
136+ dqName := "test_disk_queue_peek" + strconv .Itoa (int (time .Now ().Unix ()))
137+ tmpDir , err := ioutil .TempDir ("" , fmt .Sprintf ("nsq-test-%d" , time .Now ().UnixNano ()))
138+ if err != nil {
139+ panic (err )
140+ }
141+ defer os .RemoveAll (tmpDir )
142+ msg := bytes .Repeat ([]byte {0 }, 10 )
143+ ml := int64 (len (msg ))
144+ dq := New (dqName , tmpDir , 10 * (ml + 4 ), int32 (ml ), 1 << 10 , 2500 , 2 * time .Second , l )
145+ defer dq .Close ()
146+ NotNil (t , dq )
147+ Equal (t , int64 (0 ), dq .Depth ())
148+
149+ t .Run ("roll" , func (t * testing.T ) {
150+ for i := 0 ; i < 10 ; i ++ {
151+ err := dq .Put (msg )
152+ Nil (t , err )
153+ Equal (t , int64 (i + 1 ), dq .Depth ())
154+ }
155+
156+ for i := 10 ; i > 0 ; i -- {
157+ Equal (t , msg , <- dq .PeekChan ())
158+ Equal (t , int64 (i ), dq .Depth ())
159+
160+ Equal (t , msg , <- dq .ReadChan ())
161+ Equal (t , int64 (i - 1 ), dq .Depth ())
162+ }
163+
164+ Nil (t , dq .Empty ())
165+ })
166+
167+ t .Run ("peek-read" , func (t * testing.T ) {
168+ for i := 0 ; i < 10 ; i ++ {
169+ err := dq .Put (msg )
170+ Nil (t , err )
171+ Equal (t , int64 (i + 1 ), dq .Depth ())
172+ }
173+
174+ for i := 10 ; i > 0 ; i -- {
175+ Equal (t , msg , <- dq .PeekChan ())
176+ Equal (t , int64 (i ), dq .Depth ())
177+
178+ Equal (t , msg , <- dq .PeekChan ())
179+ Equal (t , int64 (i ), dq .Depth ())
180+
181+ Equal (t , msg , <- dq .ReadChan ())
182+ Equal (t , int64 (i - 1 ), dq .Depth ())
183+ }
184+
185+ Nil (t , dq .Empty ())
186+ })
187+
188+ t .Run ("read-peek" , func (t * testing.T ) {
189+ for i := 0 ; i < 10 ; i ++ {
190+ err := dq .Put (msg )
191+ Nil (t , err )
192+ Equal (t , int64 (i + 1 ), dq .Depth ())
193+ }
194+
195+ for i := 10 ; i > 1 ; i -- {
196+ Equal (t , msg , <- dq .PeekChan ())
197+ Equal (t , int64 (i ), dq .Depth ())
198+
199+ Equal (t , msg , <- dq .ReadChan ())
200+ Equal (t , int64 (i - 1 ), dq .Depth ())
201+
202+ Equal (t , msg , <- dq .PeekChan ())
203+ Equal (t , int64 (i - 1 ), dq .Depth ())
204+ }
205+
206+ Nil (t , dq .Empty ())
207+ })
208+
209+ }
210+
134211func assertFileNotExist (t * testing.T , fn string ) {
135212 f , err := os .OpenFile (fn , os .O_RDONLY , 0600 )
136213 Equal (t , (* os .File )(nil ), f )
@@ -1260,80 +1337,3 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
12601337 <- dq .ReadChan ()
12611338 }
12621339}
1263-
1264- func TestDiskQueuePeek (t * testing.T ) {
1265- l := NewTestLogger (t )
1266- dqName := "test_disk_queue_peek" + strconv .Itoa (int (time .Now ().Unix ()))
1267- tmpDir , err := ioutil .TempDir ("" , fmt .Sprintf ("nsq-test-%d" , time .Now ().UnixNano ()))
1268- if err != nil {
1269- panic (err )
1270- }
1271- defer os .RemoveAll (tmpDir )
1272- msg := bytes .Repeat ([]byte {0 }, 10 )
1273- ml := int64 (len (msg ))
1274- dq := New (dqName , tmpDir , 10 * (ml + 4 ), int32 (ml ), 1 << 10 , 2500 , 2 * time .Second , l )
1275- defer dq .Close ()
1276- NotNil (t , dq )
1277- Equal (t , int64 (0 ), dq .Depth ())
1278-
1279- t .Run ("roll" , func (t * testing.T ) {
1280- for i := 0 ; i < 10 ; i ++ {
1281- err := dq .Put (msg )
1282- Nil (t , err )
1283- Equal (t , int64 (i + 1 ), dq .Depth ())
1284- }
1285-
1286- for i := 10 ; i > 0 ; i -- {
1287- Equal (t , msg , <- dq .PeekChan ())
1288- Equal (t , int64 (i ), dq .Depth ())
1289-
1290- Equal (t , msg , <- dq .ReadChan ())
1291- Equal (t , int64 (i - 1 ), dq .Depth ())
1292- }
1293-
1294- Nil (t , dq .Empty ())
1295- })
1296-
1297- t .Run ("peek-read" , func (t * testing.T ) {
1298- for i := 0 ; i < 10 ; i ++ {
1299- err := dq .Put (msg )
1300- Nil (t , err )
1301- Equal (t , int64 (i + 1 ), dq .Depth ())
1302- }
1303-
1304- for i := 10 ; i > 0 ; i -- {
1305- Equal (t , msg , <- dq .PeekChan ())
1306- Equal (t , int64 (i ), dq .Depth ())
1307-
1308- Equal (t , msg , <- dq .PeekChan ())
1309- Equal (t , int64 (i ), dq .Depth ())
1310-
1311- Equal (t , msg , <- dq .ReadChan ())
1312- Equal (t , int64 (i - 1 ), dq .Depth ())
1313- }
1314-
1315- Nil (t , dq .Empty ())
1316- })
1317-
1318- t .Run ("read-peek" , func (t * testing.T ) {
1319- for i := 0 ; i < 10 ; i ++ {
1320- err := dq .Put (msg )
1321- Nil (t , err )
1322- Equal (t , int64 (i + 1 ), dq .Depth ())
1323- }
1324-
1325- for i := 10 ; i > 1 ; i -- {
1326- Equal (t , msg , <- dq .PeekChan ())
1327- Equal (t , int64 (i ), dq .Depth ())
1328-
1329- Equal (t , msg , <- dq .ReadChan ())
1330- Equal (t , int64 (i - 1 ), dq .Depth ())
1331-
1332- Equal (t , msg , <- dq .PeekChan ())
1333- Equal (t , int64 (i - 1 ), dq .Depth ())
1334- }
1335-
1336- Nil (t , dq .Empty ())
1337- })
1338-
1339- }
0 commit comments