Skip to content

Commit fd158f9

Browse files
willvelidayaron2cicoyle
authored
Adding initialVisibilityDelay option to Azure Storage Queue binding (#3820)
Signed-off-by: Will Velida <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Cassie Coyle <[email protected]>
1 parent e4a8a3e commit fd158f9

File tree

3 files changed

+129
-61
lines changed

3 files changed

+129
-61
lines changed

bindings/azure/storagequeues/metadata.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# yaml-language-server: $schema=../../../component-metadata-schema.json
1+
# yaml-language-server: $schema=../../../component-metadata-schema.json
22
schemaVersion: "v1"
33
type: "bindings"
44
name: "azure.storagequeues"
@@ -88,4 +88,13 @@ metadata:
8888
binding:
8989
output: false
9090
input: true
91+
- name: "initialVisibilityDelay"
92+
type: duration
93+
description: |
94+
Sets a delay before a message becomes visible in the queue after being added.
95+
It can also be specified per message by setting the `initialVisibilityDelay` property in the invocation request's metadata.
96+
example: '30s'
97+
binding:
98+
output: true
99+
input: false
91100

bindings/azure/storagequeues/storagequeues.go

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,16 @@ import (
3636
)
3737

3838
const (
39-
defaultTTL = 10 * time.Minute
40-
defaultVisibilityTimeout = 30 * time.Second
41-
defaultPollingInterval = 10 * time.Second
42-
dequeueCount = "dequeueCount"
43-
insertionTime = "insertionTime"
44-
expirationTime = "expirationTime"
45-
nextVisibleTime = "nextVisibleTime"
46-
popReceipt = "popReceipt"
47-
messageID = "messageID"
39+
defaultTTL = 10 * time.Minute
40+
defaultVisibilityTimeout = 30 * time.Second
41+
defaultPollingInterval = 10 * time.Second
42+
defaultInitialVisibilityDelay = 0 * time.Second
43+
dequeueCount = "dequeueCount"
44+
insertionTime = "insertionTime"
45+
expirationTime = "expirationTime"
46+
nextVisibleTime = "nextVisibleTime"
47+
popReceipt = "popReceipt"
48+
messageID = "messageID"
4849
)
4950

5051
type consumer struct {
@@ -54,7 +55,7 @@ type consumer struct {
5455
// QueueHelper enables injection for testnig.
5556
type QueueHelper interface {
5657
Init(ctx context.Context, metadata bindings.Metadata) (*storageQueuesMetadata, error)
57-
Write(ctx context.Context, data []byte, ttl *time.Duration) error
58+
Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error
5859
Read(ctx context.Context, consumer *consumer) error
5960
Close() error
6061
}
@@ -129,7 +130,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s
129130
return m, nil
130131
}
131132

132-
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
133+
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
133134
var ttlSeconds *int32
134135
if ttl != nil {
135136
ttlSeconds = ptr.Of(int32(ttl.Seconds()))
@@ -146,9 +147,16 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur
146147
s = base64.StdEncoding.EncodeToString([]byte(s))
147148
}
148149

149-
_, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{
150+
options := &azqueue.EnqueueMessageOptions{
150151
TimeToLive: ttlSeconds,
151-
})
152+
}
153+
154+
// Add the initial visibility delay if specified
155+
if initialVisibilityDelay != nil {
156+
options.VisibilityTimeout = ptr.Of(int32(initialVisibilityDelay.Seconds()))
157+
}
158+
159+
_, err = d.queueClient.EnqueueMessage(ctx, s, options)
152160

153161
return err
154162
}
@@ -248,15 +256,16 @@ type AzureStorageQueues struct {
248256
}
249257

250258
type storageQueuesMetadata struct {
251-
QueueName string
252-
QueueEndpoint string
253-
AccountName string
254-
AccountKey string
255-
DecodeBase64 bool
256-
EncodeBase64 bool
257-
PollingInterval time.Duration `mapstructure:"pollingInterval"`
258-
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
259-
VisibilityTimeout *time.Duration
259+
QueueName string
260+
QueueEndpoint string
261+
AccountName string
262+
AccountKey string
263+
DecodeBase64 bool
264+
EncodeBase64 bool
265+
PollingInterval time.Duration `mapstructure:"pollingInterval"`
266+
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
267+
VisibilityTimeout *time.Duration
268+
InitialVisibilityDelay *time.Duration `mapstructure:"initialVisibilityDelay"`
260269
}
261270

262271
func (m *storageQueuesMetadata) GetQueueURL(azEnvSettings azauth.EnvironmentSettings) string {
@@ -350,7 +359,17 @@ func (a *AzureStorageQueues) Invoke(ctx context.Context, req *bindings.InvokeReq
350359
ttlToUse = &ttl
351360
}
352361

353-
err = a.helper.Write(ctx, req.Data, ttlToUse)
362+
// Get the initial visibility delay from request metadata, or use the component's metadata
363+
initialVisibilityDelayToUse := a.metadata.InitialVisibilityDelay
364+
if val, ok := req.Metadata["initialVisibilityDelay"]; ok && val != "" {
365+
duration, parseErr := time.ParseDuration(val)
366+
if parseErr != nil {
367+
return nil, fmt.Errorf("invalid value for initialVisibilityDelay: %w", parseErr)
368+
}
369+
initialVisibilityDelayToUse = &duration
370+
}
371+
372+
err = a.helper.Write(ctx, req.Data, ttlToUse, initialVisibilityDelayToUse)
354373
if err != nil {
355374
return nil, err
356375
}

bindings/azure/storagequeues/storagequeues_test.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func (m *MockHelper) Init(ctx context.Context, metadata bindings.Metadata) (*sto
4545
return m.metadata, err
4646
}
4747

48-
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
48+
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
4949
m.messages <- data
50-
retvals := m.Called(data, ttl)
50+
retvals := m.Called(data, ttl, initialVisibilityDelay)
5151
return retvals.Error(0)
5252
}
5353

@@ -89,6 +89,8 @@ func TestWriteQueue(t *testing.T) {
8989
mm := new(MockHelper)
9090
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
9191
return in == nil
92+
}), mock.MatchedBy(func(in *time.Duration) bool {
93+
return in == nil
9294
})).Return(nil)
9395

9496
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -111,6 +113,8 @@ func TestWriteWithTTLInQueue(t *testing.T) {
111113
mm := new(MockHelper)
112114
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
113115
return in != nil && *in == time.Second
116+
}), mock.MatchedBy(func(in *time.Duration) bool {
117+
return in == nil
114118
})).Return(nil)
115119

116120
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -133,6 +137,8 @@ func TestWriteWithTTLInWrite(t *testing.T) {
133137
mm := new(MockHelper)
134138
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
135139
return in != nil && *in == time.Second
140+
}), mock.MatchedBy(func(in *time.Duration) bool {
141+
return in == nil
136142
})).Return(nil)
137143

138144
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -174,7 +180,7 @@ func TestWriteWithTTLInWrite(t *testing.T) {
174180

175181
func TestReadQueue(t *testing.T) {
176182
mm := new(MockHelper)
177-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
183+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
178184
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
179185
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
180186

@@ -215,7 +221,7 @@ func TestReadQueue(t *testing.T) {
215221

216222
func TestReadQueueDecode(t *testing.T) {
217223
mm := new(MockHelper)
218-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
224+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
219225
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
220226

221227
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -286,7 +292,7 @@ func TestReadQueueDecode(t *testing.T) {
286292
*/
287293
func TestReadQueueNoMessage(t *testing.T) {
288294
mm := new(MockHelper)
289-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
295+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
290296
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
291297

292298
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -316,70 +322,76 @@ func TestReadQueueNoMessage(t *testing.T) {
316322

317323
func TestParseMetadata(t *testing.T) {
318324
oneSecondDuration := time.Second
319-
320325
testCases := []struct {
321326
name string
322327
properties map[string]string
323328
// Account key is parsed in azauth
324329
// expectedAccountKey string
325-
expectedQueueName string
326-
expectedQueueEndpointURL string
327-
expectedPollingInterval time.Duration
328-
expectedTTL *time.Duration
329-
expectedVisibilityTimeout *time.Duration
330+
expectedQueueName string
331+
expectedQueueEndpointURL string
332+
expectedPollingInterval time.Duration
333+
expectedTTL *time.Duration
334+
expectedVisibilityTimeout *time.Duration
335+
expectedInitialVisibilityDelay *time.Duration
330336
}{
331337
{
332338
name: "Account and key",
333339
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1"},
334340
// expectedAccountKey: "myKey",
335-
expectedQueueName: "queue1",
336-
expectedQueueEndpointURL: "",
337-
expectedPollingInterval: defaultPollingInterval,
338-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
341+
expectedQueueName: "queue1",
342+
expectedQueueEndpointURL: "",
343+
expectedPollingInterval: defaultPollingInterval,
344+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
345+
expectedInitialVisibilityDelay: nil,
339346
},
340347
{
341348
name: "Accout, key, and endpoint",
342349
properties: map[string]string{"accountKey": "myKey", "queueName": "queue1", "storageAccount": "someAccount", "queueEndpointUrl": "https://foo.example.com:10001"},
343350
// expectedAccountKey: "myKey",
344-
expectedQueueName: "queue1",
345-
expectedQueueEndpointURL: "https://foo.example.com:10001",
346-
expectedPollingInterval: defaultPollingInterval,
347-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
351+
expectedQueueName: "queue1",
352+
expectedQueueEndpointURL: "https://foo.example.com:10001",
353+
expectedPollingInterval: defaultPollingInterval,
354+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
355+
expectedInitialVisibilityDelay: nil,
348356
},
349357
{
350358
name: "Empty TTL",
351359
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: ""},
352360
// expectedAccountKey: "myKey",
353-
expectedQueueName: "queue1",
354-
expectedQueueEndpointURL: "",
355-
expectedPollingInterval: defaultPollingInterval,
356-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
361+
expectedQueueName: "queue1",
362+
expectedQueueEndpointURL: "",
363+
expectedPollingInterval: defaultPollingInterval,
364+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
365+
expectedInitialVisibilityDelay: nil,
357366
},
358367
{
359368
name: "With TTL",
360369
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"},
361370
// expectedAccountKey: "myKey",
362-
expectedQueueName: "queue1",
363-
expectedTTL: &oneSecondDuration,
364-
expectedQueueEndpointURL: "",
365-
expectedPollingInterval: defaultPollingInterval,
366-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
371+
expectedQueueName: "queue1",
372+
expectedTTL: &oneSecondDuration,
373+
expectedQueueEndpointURL: "",
374+
expectedPollingInterval: defaultPollingInterval,
375+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
376+
expectedInitialVisibilityDelay: nil,
367377
},
368378
{
369-
name: "With visibility timeout",
370-
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
371-
expectedQueueName: "queue1",
372-
expectedPollingInterval: defaultPollingInterval,
373-
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
379+
name: "With visibility timeout",
380+
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
381+
expectedQueueName: "queue1",
382+
expectedPollingInterval: defaultPollingInterval,
383+
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
384+
expectedInitialVisibilityDelay: nil,
374385
},
375386
{
376387
name: "With polling interval",
377388
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "pollingInterval": "2s"},
378389
// expectedAccountKey: "myKey",
379-
expectedQueueName: "queue1",
380-
expectedQueueEndpointURL: "",
381-
expectedPollingInterval: 2 * time.Second,
382-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
390+
expectedQueueName: "queue1",
391+
expectedQueueEndpointURL: "",
392+
expectedPollingInterval: 2 * time.Second,
393+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
394+
expectedInitialVisibilityDelay: nil,
383395
},
384396
}
385397

@@ -448,3 +460,31 @@ func TestParseMetadataWithInvalidTTL(t *testing.T) {
448460
})
449461
}
450462
}
463+
464+
func TestWriteWithInitialVisibilityDelay(t *testing.T) {
465+
mm := new(MockHelper)
466+
expectedDelay := 5 * time.Second
467+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
468+
return in == nil
469+
}), mock.MatchedBy(func(in *time.Duration) bool {
470+
return in != nil && *in == expectedDelay
471+
})).Return(nil)
472+
473+
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
474+
475+
m := bindings.Metadata{}
476+
m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"}
477+
478+
err := a.Init(t.Context(), m)
479+
require.NoError(t, err)
480+
481+
r := bindings.InvokeRequest{
482+
Data: []byte("This is my message"),
483+
Metadata: map[string]string{"initialVisibilityDelay": "5s"},
484+
}
485+
486+
_, err = a.Invoke(t.Context(), &r)
487+
488+
require.NoError(t, err)
489+
require.NoError(t, a.Close())
490+
}

0 commit comments

Comments
 (0)