11package servicebus
22
3+ // MIT License
4+ //
5+ // Copyright (c) Microsoft Corporation. All rights reserved.
6+ //
7+ // Permission is hereby granted, free of charge, to any person obtaining a copy
8+ // of this software and associated documentation files (the "Software"), to deal
9+ // in the Software without restriction, including without limitation the rights
10+ // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+ // copies of the Software, and to permit persons to whom the Software is
12+ // furnished to do so, subject to the following conditions:
13+ //
14+ // The above copyright notice and this permission notice shall be included in all
15+ // copies or substantial portions of the Software.
16+ //
17+ // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+ // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+ // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+ // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+ // SOFTWARE
24+
325import (
426 "context"
527 "math/rand"
@@ -12,7 +34,7 @@ import (
1234)
1335
1436func (suite * serviceBusSuite ) TestQueueSendReceiveWithLock () {
15- tests := map [string ]func (context.Context , * testing.T , * Queue ){
37+ tests := map [string ]func (context.Context , * testing.T , * Queue , int ){
1638 "SimpleSendReceiveWithLock" : testQueueSendAndReceiveWithRenewLock ,
1739 }
1840
@@ -22,13 +44,14 @@ func (suite *serviceBusSuite) TestQueueSendReceiveWithLock() {
2244 queueName := suite .randEntityName ()
2345 ctx , cancel := context .WithTimeout (context .Background (), time .Second * 300 )
2446 defer cancel ()
25- lockDuration := 30 * time .Second
47+ lockDuration := 10 * time .Second
2648
2749 cleanup := makeQueue (ctx , t , ns , queueName , QueueEntityWithLockDuration (& lockDuration ))
28- q , err := ns .NewQueue (queueName )
50+ numMessages := rand .Intn (40 ) + 10
51+ q , err := ns .NewQueue (queueName , QueueWithPrefetchCount (uint32 (numMessages )))
2952 suite .NoError (err )
3053 defer cleanup ()
31- testFunc (ctx , t , q )
54+ testFunc (ctx , t , q , numMessages )
3255 suite .NoError (q .Close (ctx ))
3356 if ! t .Failed () {
3457 // If there are message on the queue this would mean that a lock wasn't held and the message was requeued.
@@ -39,17 +62,18 @@ func (suite *serviceBusSuite) TestQueueSendReceiveWithLock() {
3962 }
4063}
4164
42- func testQueueSendAndReceiveWithRenewLock (ctx context.Context , t * testing.T , queue * Queue ) {
65+ func testQueueSendAndReceiveWithRenewLock (ctx context.Context , t * testing.T , queue * Queue , numMessages int ) {
4366 ttl := 5 * time .Minute
44- numMessages := rand .Intn (40 ) + 20
4567 activeMessages := make ([]* Message , 0 , numMessages )
4668 expected := make (map [string ]int , numMessages )
4769 seen := make (map [string ]int , numMessages )
4870 errs := make (chan error , 1 )
4971
50- renewEvery := time .Second * 20
51- processingTime := time .Second * 100
52-
72+ renewEvery := time .Second * 6
73+ // all are held in memory. we wait a bit longer than the lock expiry set to 10s
74+ processingTime := 15 * time .Second
75+ t .Logf ("processing time : %s" , processingTime )
76+ t .Logf ("processing time : %s \n " , processingTime )
5377 t .Logf ("Sending/receiving %d messages" , numMessages )
5478
5579 // Receiving Loop
@@ -59,7 +83,7 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
5983 errs <- queue .Receive (inner , HandlerFunc (func (ctx context.Context , msg * Message ) error {
6084 numSeen ++
6185 seen [string (msg .Data )]++
62-
86+ t . Logf ( "handling message %d - %s \n " , numSeen , msg . LockToken )
6387 activeMessages = append (activeMessages , msg )
6488 if numSeen >= numMessages {
6589 cancel ()
@@ -81,6 +105,7 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
81105 if err != nil && runRenewal {
82106 t .Error (err )
83107 }
108+ t .Logf ("renewed locks successfuly for %d messages\n " , len (activeMessages ))
84109 }
85110 }()
86111
@@ -100,6 +125,7 @@ func testQueueSendAndReceiveWithRenewLock(ctx context.Context, t *testing.T, que
100125
101126 // Then finally accept all the messages we're holding locks on
102127 for _ , msg := range activeMessages {
128+ t .Logf ("completing %d messages" , len (activeMessages ))
103129 assert .NoError (t , msg .Complete (ctx ))
104130 }
105131
0 commit comments