@@ -14,8 +14,12 @@ limitations under the License.
1414package servicebusqueue_test
1515
1616import (
17+ "bytes"
1718 "context"
19+ "crypto/rand"
20+ "encoding/base64"
1821 "fmt"
22+ "io"
1923 "testing"
2024 "time"
2125
@@ -49,6 +53,15 @@ const (
4953 numMessages = 100
5054)
5155
56+ var testprefix string
57+
58+ func init () {
59+ // Generate a random test prefix
60+ rnd := make ([]byte , 7 )
61+ io .ReadFull (rand .Reader , rnd )
62+ testprefix = base64 .RawURLEncoding .EncodeToString (rnd )
63+ }
64+
5265func TestServiceBusQueue (t * testing.T ) {
5366 messagesFor1 := watcher .NewOrdered ()
5467 messagesFor2 := watcher .NewOrdered ()
@@ -67,11 +80,11 @@ func TestServiceBusQueue(t *testing.T) {
6780 msgsFor1 := make ([]string , numMessages / 2 )
6881 msgsFor2 := make ([]string , numMessages / 2 )
6982 for i := 0 ; i < numMessages / 2 ; i ++ {
70- msgsFor1 [i ] = fmt .Sprintf ("sb-binding-1: Message %03d" , i )
83+ msgsFor1 [i ] = fmt .Sprintf ("%s: sb-binding-1: Message %03d" , testprefix , i )
7184 }
7285
7386 for i := numMessages / 2 ; i < numMessages ; i ++ {
74- msgsFor2 [i - (numMessages / 2 )] = fmt .Sprintf ("sb-binding-2: Message %03d" , i )
87+ msgsFor2 [i - (numMessages / 2 )] = fmt .Sprintf ("%s: sb-binding-2: Message %03d" , testprefix , i )
7588 }
7689
7790 messagesFor1 .ExpectStrings (msgsFor1 ... )
@@ -108,11 +121,19 @@ func TestServiceBusQueue(t *testing.T) {
108121 // Setup the input binding endpoints
109122 err = multierr .Combine (err ,
110123 s .AddBindingInvocationHandler ("sb-binding-1" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
124+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
125+ return []byte ("{}" ), nil
126+ }
127+
111128 messagesFor1 .Observe (string (in .Data ))
112129 ctx .Logf ("Got message: %s" , string (in .Data ))
113130 return []byte ("{}" ), nil
114131 }),
115132 s .AddBindingInvocationHandler ("sb-binding-2" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
133+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
134+ return []byte ("{}" ), nil
135+ }
136+
116137 messagesFor2 .Observe (string (in .Data ))
117138 ctx .Logf ("Got message: %s" , string (in .Data ))
118139 return []byte ("{}" ), nil
@@ -128,7 +149,7 @@ func TestServiceBusQueue(t *testing.T) {
128149 embedded .WithAppProtocol (runtime .HTTPProtocol , appPort ),
129150 embedded .WithDaprGRPCPort (grpcPort ),
130151 embedded .WithDaprHTTPPort (httpPort ),
131- embedded .WithComponentsPath ("./components/standard" ),
152+ embedded .WithResourcesPath ("./components/standard" ),
132153 componentRuntimeOptions (),
133154 )).
134155 // Block the standard AMPQ ports.
@@ -151,23 +172,38 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {
151172
152173 ctx .Logf ("Sending messages for expiration." )
153174 for i := 0 ; i < numMessages ; i ++ {
154- msg := fmt .Sprintf ("Expiring message %d" , i )
175+ msg := fmt .Sprintf ("%s: Expiring message %d" , testprefix , i )
155176
156177 metadata := make (map [string ]string )
157178
158179 // Send to the queue with TTL.
159- queueTTLReq := & daprClient.InvokeBindingRequest {Name : "queuettl" , Operation : "create" , Data : []byte (msg ), Metadata : metadata }
180+ queueTTLReq := & daprClient.InvokeBindingRequest {
181+ Name : "queuettl" ,
182+ Operation : "create" ,
183+ Data : []byte (msg ),
184+ Metadata : metadata ,
185+ }
160186 err := client .InvokeOutputBinding (ctx , queueTTLReq )
161187 require .NoError (ctx , err , "error publishing message" )
162188
163189 // Send message with TTL.
164- messageTTLReq := & daprClient.InvokeBindingRequest {Name : "messagettl" , Operation : "create" , Data : []byte (msg ), Metadata : metadata }
190+ messageTTLReq := & daprClient.InvokeBindingRequest {
191+ Name : "messagettl" ,
192+ Operation : "create" ,
193+ Data : []byte (msg ),
194+ Metadata : metadata ,
195+ }
165196 messageTTLReq .Metadata ["ttlInSeconds" ] = "10"
166197 err = client .InvokeOutputBinding (ctx , messageTTLReq )
167198 require .NoError (ctx , err , "error publishing message" )
168199
169200 // Send message with TTL to ensure it overwrites Queue TTL.
170- mixedTTLReq := & daprClient.InvokeBindingRequest {Name : "mixedttl" , Operation : "create" , Data : []byte (msg ), Metadata : metadata }
201+ mixedTTLReq := & daprClient.InvokeBindingRequest {
202+ Name : "mixedttl" ,
203+ Operation : "create" ,
204+ Data : []byte (msg ),
205+ Metadata : metadata ,
206+ }
171207 mixedTTLReq .Metadata ["ttlInSeconds" ] = "10"
172208 err = client .InvokeOutputBinding (ctx , mixedTTLReq )
173209 require .NoError (ctx , err , "error publishing message" )
@@ -182,16 +218,28 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {
182218 // Setup the input binding endpoints
183219 err = multierr .Combine (err ,
184220 s .AddBindingInvocationHandler ("queuettl" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
221+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
222+ return []byte ("{}" ), nil
223+ }
224+
185225 ctx .Logf ("Oh no! Got message: %s" , string (in .Data ))
186226 ttlMessages .FailIfNotExpected (t , string (in .Data ))
187227 return []byte ("{}" ), nil
188228 }),
189229 s .AddBindingInvocationHandler ("messagettl" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
230+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
231+ return []byte ("{}" ), nil
232+ }
233+
190234 ctx .Logf ("Oh no! Got message: %s" , string (in .Data ))
191235 ttlMessages .FailIfNotExpected (t , string (in .Data ))
192236 return []byte ("{}" ), nil
193237 }),
194238 s .AddBindingInvocationHandler ("mixedttl" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
239+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
240+ return []byte ("{}" ), nil
241+ }
242+
195243 ctx .Logf ("Oh no! Got message: %s" , string (in .Data ))
196244 ttlMessages .FailIfNotExpected (t , string (in .Data ))
197245 return []byte ("{}" ), nil
@@ -207,7 +255,7 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {
207255 embedded .WithoutApp (),
208256 embedded .WithDaprGRPCPort (grpcPort ),
209257 embedded .WithDaprHTTPPort (httpPort ),
210- embedded .WithComponentsPath ("./components/ttl" ),
258+ embedded .WithResourcesPath ("./components/ttl" ),
211259 componentRuntimeOptions (),
212260 )).
213261 Step ("send ttl messages" , sendTTLMessages ).
@@ -242,7 +290,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
242290 // that will satisfy the test.
243291 msgs := make ([]string , numMessages / 2 )
244292 for i := 0 ; i < numMessages / 2 ; i ++ {
245- msgs [i ] = fmt .Sprintf ("Message %03d" , i )
293+ msgs [i ] = fmt .Sprintf ("%s: Message %03d" , testprefix , i )
246294 }
247295
248296 messages .ExpectStrings (msgs ... )
@@ -252,7 +300,11 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
252300 for _ , msg := range msgs {
253301 ctx .Logf ("Sending: %q" , msg )
254302
255- req := & daprClient.InvokeBindingRequest {Name : "retry-binding" , Operation : "create" , Data : []byte (msg )}
303+ req := & daprClient.InvokeBindingRequest {
304+ Name : "retry-binding" ,
305+ Operation : "create" ,
306+ Data : []byte (msg ),
307+ }
256308 err := client .InvokeOutputBinding (ctx , req )
257309 require .NoError (ctx , err , "error publishing message" )
258310 }
@@ -271,6 +323,10 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
271323 // Setup the input binding endpoint
272324 err = multierr .Combine (err ,
273325 s .AddBindingInvocationHandler ("retry-binding" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
326+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
327+ return []byte ("{}" ), nil
328+ }
329+
274330 if err := sim (); err != nil {
275331 ctx .Logf ("Failing message: %s" , string (in .Data ))
276332 return nil , err
@@ -291,7 +347,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
291347 embedded .WithAppProtocol (runtime .HTTPProtocol , appPort ),
292348 embedded .WithDaprGRPCPort (grpcPort ),
293349 embedded .WithDaprHTTPPort (httpPort ),
294- embedded .WithComponentsPath ("./components/retry" ),
350+ embedded .WithResourcesPath ("./components/retry" ),
295351 componentRuntimeOptions (),
296352 )).
297353 Step ("send and wait" , test ).
@@ -312,10 +368,17 @@ func TestServiceBusQueueMetadata(t *testing.T) {
312368
313369 // Send events that the application above will observe.
314370 ctx .Log ("Invoking binding!" )
315- req := & daprClient.InvokeBindingRequest {Name : "sb-binding-1" , Operation : "create" , Data : []byte ("test msg" ), Metadata : map [string ]string {"Testmetadata" : "Some Metadata" }}
371+ req := & daprClient.InvokeBindingRequest {
372+ Name : "sb-binding-1" ,
373+ Operation : "create" ,
374+ Data : []byte (testprefix + ": test msg" ),
375+ Metadata : map [string ]string {"Testmetadata" : "Some Metadata" },
376+ }
316377 err = client .InvokeOutputBinding (ctx , req )
317378 require .NoError (ctx , err , "error publishing message" )
318379
380+ messages .ExpectStrings (string (req .Data ))
381+
319382 // Do the messages we observed match what we expect?
320383 messages .Assert (ctx , time .Minute )
321384
@@ -327,11 +390,15 @@ func TestServiceBusQueueMetadata(t *testing.T) {
327390 // Setup the input binding endpoints
328391 err = multierr .Combine (err ,
329392 s .AddBindingInvocationHandler ("sb-binding-1" , func (_ context.Context , in * common.BindingEvent ) ([]byte , error ) {
393+ if ! bytes .HasPrefix (in .Data , []byte (testprefix )) {
394+ return []byte ("{}" ), nil
395+ }
396+
330397 messages .Observe (string (in .Data ))
331- ctx .Logf ("Got message: %s - %+ v" , string (in .Data ), in .Metadata )
332- require .NotEmpty ( t , in .Metadata )
333- require .Contains (t , in .Metadata , "Testmetadata" )
334- require .Equal (t , "Some Metadata" , in .Metadata ["Testmetadata" ])
398+ ctx .Logf ("Got message: %s - %# v" , string (in .Data ), in .Metadata )
399+ require .NotEmptyf ( t , in . Metadata , "Data: %s - Metadata: %#v" , in . Data , in .Metadata )
400+ require .Containsf (t , in .Metadata , "Testmetadata" , "Data: %s - Metadata: %#v" , in . Data , in . Metadata )
401+ require .Equalf (t , "Some+ Metadata" , in .Metadata ["Testmetadata" ], "Data: %s - Metadata: %#v" , in . Data , in . Metadata ) // + because the message is encoded for HTTP headers
335402
336403 return []byte ("{}" ), nil
337404 }))
@@ -346,7 +413,7 @@ func TestServiceBusQueueMetadata(t *testing.T) {
346413 embedded .WithAppProtocol (runtime .HTTPProtocol , appPort ),
347414 embedded .WithDaprGRPCPort (grpcPort ),
348415 embedded .WithDaprHTTPPort (httpPort ),
349- embedded .WithComponentsPath ("./components/standard" ),
416+ embedded .WithResourcesPath ("./components/standard" ),
350417 componentRuntimeOptions (),
351418 )).
352419 Step ("send and wait" , test ).
@@ -364,7 +431,12 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) {
364431
365432 // Send events that the application above will observe.
366433 ctx .Log ("Invoking binding!" )
367- req := & daprClient.InvokeBindingRequest {Name : "mgmt-binding" , Operation : "create" , Data : []byte ("test msg" ), Metadata : map [string ]string {"TestMetadata" : "Some Metadata" }}
434+ req := & daprClient.InvokeBindingRequest {
435+ Name : "mgmt-binding" ,
436+ Operation : "create" ,
437+ Data : []byte (testprefix + ": test msg" ),
438+ Metadata : map [string ]string {"TestMetadata" : "Some Metadata" },
439+ }
368440 err = client .InvokeOutputBinding (ctx , req )
369441 require .Error (ctx , err , "error publishing message" )
370442 return nil
@@ -376,7 +448,7 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) {
376448 embedded .WithoutApp (),
377449 embedded .WithDaprGRPCPort (grpcPort ),
378450 embedded .WithDaprHTTPPort (httpPort ),
379- embedded .WithComponentsPath ("./components/disable_entity_mgmt" ),
451+ embedded .WithResourcesPath ("./components/disable_entity_mgmt" ),
380452 componentRuntimeOptions (),
381453 )).
382454 Step ("send and wait" , testWithExpectedFailure ).
0 commit comments