Skip to content

Commit 154d77d

Browse files
[azservicebus] Adding in an example of how you could write your own lock renewer (Azure#23168)
Just a simple example for people that want to write their own renewer. Also, for convenience, adding in a .ps1 file to generate the .env file if you already have a resource set up. Fixes Azure#20857
1 parent d55f6d9 commit 154d77d

File tree

3 files changed

+265
-0
lines changed

3 files changed

+265
-0
lines changed

sdk/messaging/azservicebus/env.ps1

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/usr/bin/env pwsh
2+
3+
# Copyright (c) Microsoft Corporation. All rights reserved.
4+
# Licensed under the MIT License.
5+
6+
# This script will bootstrap your .env file, given a resource group that's been deployed to.
7+
# We expect a premium and a standard Service Bus namespace for our tests.
8+
#
9+
# You might need to install the Azure Powershell module first and login:
10+
#
11+
# Install-Module -Name Az -Repository PSGallery -Force
12+
# Connect-AzAccount -UseDeviceAuthentication
13+
#
14+
15+
# TODO:
16+
$rg = "<your resource group should go here>"
17+
18+
$contents = ""
19+
20+
Get-AzServiceBusNamespace -ResourceGroup $rg
21+
| ForEach-Object {
22+
$cs = (Get-AzServiceBusKey -ResourceGroup $rg -NamespaceName $_.Name -AuthorizationRuleName RootManageSharedAccessKey).PrimaryConnectionString
23+
$endpoint = $_.ServiceBusEndpoint.Replace("https://", "").Replace(":443/", "")
24+
25+
if ($_.SkuTier -eq "Standard") {
26+
# this is in the .bicep file for this - we create a few keys with different permissions for testing.
27+
$noManageCs = (Get-AzServiceBusKey -ResourceGroup $rg -NamespaceName $_.Name -AuthorizationRuleName NoManage).PrimaryConnectionString
28+
$sendOnlyCs = (Get-AzServiceBusKey -ResourceGroup $rg -NamespaceName $_.Name -AuthorizationRuleName SendOnly).PrimaryConnectionString
29+
$listenOnlyCs = (Get-AzServiceBusKey -ResourceGroup $rg -NamespaceName $_.Name -AuthorizationRuleName ListenOnly).PrimaryConnectionString
30+
31+
$contents += "`n# standard`n"
32+
$contents += "SERVICEBUS_CONNECTION_STRING=$cs`n"
33+
$contents += "SERVICEBUS_ENDPOINT=$endpoint`n"
34+
$contents += "SERVICEBUS_CONNECTION_STRING_NO_MANAGE=$noManageCs`n"
35+
$contents += "SERVICEBUS_CONNECTION_STRING_SEND_ONLY=$sendOnlyCs`n"
36+
$contents += "SERVICEBUS_CONNECTION_STRING_LISTEN_ONLY=$listenOnlyCs`n"
37+
}
38+
else {
39+
# we do a little bit of testing on premium.
40+
$contents += "`n# premium`n"
41+
$contents += "SERVICEBUS_CONNECTION_STRING_PREMIUM=$cs`n"
42+
$contents += "SERVICEBUS_ENDPOINT_PREMIUM=$endpoint`n"
43+
}
44+
}
45+
46+
Out-File ".env" -InputObject $contents
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azservicebus_test
5+
6+
import (
7+
"context"
8+
"errors"
9+
"log"
10+
"time"
11+
12+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
13+
)
14+
15+
// autoRenewMessageLock runs a loop calling [azservicebus.Receiver.RenewLock] until the message has been completed, the lock
16+
// has expired or the passed in context has been cancelled.
17+
//
18+
// This function returns nil if the message's lock has expired.
19+
//
20+
// - receiver - the receiver that received 'msg'
21+
// - msg - the received message
22+
// - configuredLockDuration - the lock duration, configured at the queue or subscription level.
23+
func autoRenewMessageLock(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage, configuredLockDuration time.Duration) error {
24+
for {
25+
log.Printf("Renewing lock for %s", msg.MessageID)
26+
if err := receiver.RenewMessageLock(ctx, msg, nil); err != nil {
27+
// You get this error if your lock has expired _or_ if you've settled the message.
28+
// We can ignore it here because the error will also be returned by the settlement
29+
// method (CompleteMessage, AbandonMessage, etc..)
30+
if sbErr := (*azservicebus.Error)(nil); errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
31+
log.Printf("Lock was lost for message with ID %s", msg.MessageID)
32+
return nil
33+
}
34+
35+
return err
36+
}
37+
log.Printf("Renewed lock for %s", msg.MessageID)
38+
39+
select {
40+
case <-ctx.Done():
41+
log.Printf("Cancelling lock renewal for %s", msg.MessageID)
42+
return ctx.Err()
43+
case <-time.After(configuredLockDuration / 2):
44+
}
45+
}
46+
}
47+
48+
func Example_autoRenewLocks() {
49+
// This is configurable, on the service. Change this value to match since it's used
50+
// to figure out how often to renew a lock.
51+
const queueOrSubscriptionLockDuration = 30 * time.Second
52+
53+
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
54+
55+
if err != nil {
56+
// TODO: Update the following line with your application specific error handling logic
57+
log.Printf("ERROR: %s", err)
58+
return
59+
}
60+
61+
for _, msg := range messages {
62+
go func(msg *azservicebus.ReceivedMessage) {
63+
//
64+
// Each goroutine will _eventually_ stop once its message has been settled.
65+
//
66+
// If you want to be more proactive you can create a separate context for each message
67+
// and cancel that when you settle.
68+
//
69+
err := autoRenewMessageLock(context.TODO(), receiver, msg, queueOrSubscriptionLockDuration)
70+
71+
if err != nil {
72+
// TODO: Update the following line with your application specific error handling logic
73+
log.Printf("ERROR: %s", err)
74+
return
75+
}
76+
}(msg)
77+
}
78+
79+
// TODO: Process the messages.
80+
//
81+
// Our lock renewal goroutines will stop once RenewLock returns that the lock has been lost. This happens
82+
// if the lock has expired or if we've settled the message.
83+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azservicebus_test
5+
6+
import (
7+
"context"
8+
"log"
9+
"testing"
10+
"time"
11+
12+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
13+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
14+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom"
15+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
// NOTE: tests should migrate into here since this is the actual _test package.
20+
21+
func TestRenewLocks(t *testing.T) {
22+
client := test.NewClient(t, test.NewClientArgs[azservicebus.ClientOptions, azservicebus.Client]{
23+
NewClientFromConnectionString: azservicebus.NewClientFromConnectionString,
24+
NewClient: azservicebus.NewClient,
25+
}, nil)
26+
27+
queueName, cleanupQueue := test.CreateExpiringQueue(t, &atom.QueueDescription{
28+
LockDuration: to.Ptr("PT5S"),
29+
})
30+
31+
defer cleanupQueue()
32+
33+
sender, err := client.NewSender(queueName, nil)
34+
require.NoError(t, err)
35+
36+
defer test.RequireClose(t, sender)
37+
38+
receiver, err := client.NewReceiverForQueue(queueName, nil)
39+
require.NoError(t, err)
40+
41+
defer test.RequireClose(t, receiver)
42+
43+
err = sender.SendMessage(context.Background(), &azservicebus.Message{
44+
Body: []byte("hello world"),
45+
}, nil)
46+
require.NoError(t, err)
47+
48+
messages, err := receiver.ReceiveMessages(context.Background(), 1, nil)
49+
require.NoError(t, err)
50+
51+
errCh := make(chan error)
52+
53+
go func() {
54+
err = autoRenewMessageLock(context.Background(), receiver, messages[0], 5*time.Second)
55+
errCh <- err
56+
}()
57+
58+
time.Sleep(10 * time.Second) // make sure we renew a few times.
59+
60+
log.Printf("Completing message")
61+
err = receiver.CompleteMessage(context.Background(), messages[0], nil)
62+
require.NoError(t, err)
63+
log.Printf("Completed message")
64+
65+
select {
66+
case err := <-errCh:
67+
require.NoError(t, err)
68+
case <-time.After(5 * time.Second):
69+
require.Fail(t, "goroutine took longer than 5 seconds to complete")
70+
}
71+
}
72+
73+
func TestRenewLocksEarlyCancel(t *testing.T) {
74+
client := test.NewClient(t, test.NewClientArgs[azservicebus.ClientOptions, azservicebus.Client]{
75+
NewClientFromConnectionString: azservicebus.NewClientFromConnectionString,
76+
NewClient: azservicebus.NewClient,
77+
}, nil)
78+
79+
queueName, cleanupQueue := test.CreateExpiringQueue(t, &atom.QueueDescription{
80+
// this time we're bumping up the time. We want to make sure we're cancelling.
81+
LockDuration: to.Ptr("PT1M"),
82+
})
83+
84+
defer cleanupQueue()
85+
86+
sender, err := client.NewSender(queueName, nil)
87+
require.NoError(t, err)
88+
89+
defer test.RequireClose(t, sender)
90+
91+
receiver, err := client.NewReceiverForQueue(queueName, nil)
92+
require.NoError(t, err)
93+
94+
defer test.RequireClose(t, receiver)
95+
96+
err = sender.SendMessage(context.Background(), &azservicebus.Message{
97+
Body: []byte("hello world"),
98+
}, nil)
99+
require.NoError(t, err)
100+
101+
messages, err := receiver.ReceiveMessages(context.Background(), 1, nil)
102+
require.NoError(t, err)
103+
104+
t.Run("ContextAlreadyCancelled", func(t *testing.T) {
105+
// pre-cancelled.
106+
ctx, cancel := context.WithCancel(context.Background())
107+
cancel()
108+
109+
log.Printf("Auto renewing lock, but the context is already cancelled")
110+
err = autoRenewMessageLock(ctx, receiver, messages[0], time.Minute)
111+
log.Printf("Auto renewer returned %s", err)
112+
require.ErrorIs(t, err, context.Canceled)
113+
})
114+
115+
t.Run("ContextCancelledAfterTenSeconds", func(t *testing.T) {
116+
// now cancel _after_ the first renewal has taken place (by sleeping a bit...)
117+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
118+
defer cancel()
119+
120+
errCh := make(chan error)
121+
122+
log.Printf("Auto renewing lock, context will cancel after 10 seconds")
123+
go func() {
124+
err = autoRenewMessageLock(ctx, receiver, messages[0], time.Minute)
125+
errCh <- err
126+
}()
127+
128+
select {
129+
case err := <-errCh:
130+
log.Printf("Auto renewer returned %s", err)
131+
require.ErrorIs(t, err, context.DeadlineExceeded)
132+
case <-time.After(15 * time.Second):
133+
require.Fail(t, "goroutine took longer than 15 seconds to cancel")
134+
}
135+
})
136+
}

0 commit comments

Comments
 (0)