Skip to content

Commit 0120009

Browse files
authored
Merge branch 'main' into 3318-RavenDB-state-store-new
Signed-off-by: Nemanja Malocic <[email protected]>
2 parents 4bbe532 + e72dbb6 commit 0120009

File tree

43 files changed

+1753
-151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1753
-151
lines changed

.build-tools/component-folders.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ componentFolders:
99
- secretstores
1010
- state
1111
- workflows
12+
- conversation
1213

1314
excludeFolders:
1415
- bindings/alicloud

.build-tools/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
)
1313

1414
require (
15-
github.com/dapr/kit v0.15.3-0.20250516121556-bc7dc566c45d // indirect
15+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 // indirect
1616
github.com/gogo/protobuf v1.3.2 // indirect
1717
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
1818
github.com/inconshreveable/mousetrap v1.0.1 // indirect

.build-tools/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
2-
github.com/dapr/kit v0.15.3-0.20250516121556-bc7dc566c45d h1:v+kZn9ami23xBsruyZmKErIOSlCdW9pR8wfHUg5+jys=
3-
github.com/dapr/kit v0.15.3-0.20250516121556-bc7dc566c45d/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
2+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 h1:I1Uoy3fn906AZZdG8+n8fHitgY7Wn9c+smz4WQdOy1Q=
3+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
44
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
55
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
66
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

.build-tools/pkg/metadataschema/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type ComponentMetadata struct {
2020
// Version of the component metadata schema.
2121
SchemaVersion string `json:"schemaVersion" yaml:"schemaVersion" jsonschema:"enum=v1"`
2222
// Component type, of one of the allowed values.
23-
Type string `json:"type" yaml:"type" jsonschema:"enum=bindings,enum=state,enum=secretstores,enum=pubsub,enum=workflows,enum=configuration,enum=lock,enum=middleware,enum=crypto"`
23+
Type string `json:"type" yaml:"type" jsonschema:"enum=bindings,enum=state,enum=secretstores,enum=pubsub,enum=workflows,enum=configuration,enum=lock,enum=middleware,enum=crypto,enum=conversation"`
2424
// Name of the component (without the inital type, e.g. "http" instead of "bindings.http").
2525
Name string `json:"name" yaml:"name"`
2626
// Version of the component, with the leading "v", e.g. "v1".
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
services:
2+
coherence:
3+
image: 'ghcr.io/oracle/coherence-ce:14.1.2-0-2-java17'
4+
environment:
5+
- coherence.management.http=all
6+
- coherence.management.http.port=30000
7+
- Dcoherence.health.http.port=6676
8+
- coherence.wka=127.0.0.1
9+
ports:
10+
- 30000:30000
11+
- 1408:1408
12+
- 9612:9612
13+
- 6676:6676

bindings/azure/eventhubs/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ metadata:
164164
- name: getAllMessageProperties
165165
type: bool
166166
required: false
167-
default: false
167+
default: "false"
168168
example: "false"
169169
binding:
170170
input: true

bindings/azure/storagequeues/metadata.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# yaml-language-server: $schema=../../../component-metadata-schema.json
1+
# yaml-language-server: $schema=../../../component-metadata-schema.json
22
schemaVersion: "v1"
33
type: "bindings"
44
name: "azure.storagequeues"
@@ -88,4 +88,13 @@ metadata:
8888
binding:
8989
output: false
9090
input: true
91+
- name: "initialVisibilityDelay"
92+
type: duration
93+
description: |
94+
Sets a delay before a message becomes visible in the queue after being added.
95+
It can also be specified per message by setting the `initialVisibilityDelay` property in the invocation request's metadata.
96+
example: '30s'
97+
binding:
98+
output: true
99+
input: false
91100

bindings/azure/storagequeues/storagequeues.go

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,16 @@ import (
3636
)
3737

3838
const (
39-
defaultTTL = 10 * time.Minute
40-
defaultVisibilityTimeout = 30 * time.Second
41-
defaultPollingInterval = 10 * time.Second
42-
dequeueCount = "dequeueCount"
43-
insertionTime = "insertionTime"
44-
expirationTime = "expirationTime"
45-
nextVisibleTime = "nextVisibleTime"
46-
popReceipt = "popReceipt"
47-
messageID = "messageID"
39+
defaultTTL = 10 * time.Minute
40+
defaultVisibilityTimeout = 30 * time.Second
41+
defaultPollingInterval = 10 * time.Second
42+
defaultInitialVisibilityDelay = 0 * time.Second
43+
dequeueCount = "dequeueCount"
44+
insertionTime = "insertionTime"
45+
expirationTime = "expirationTime"
46+
nextVisibleTime = "nextVisibleTime"
47+
popReceipt = "popReceipt"
48+
messageID = "messageID"
4849
)
4950

5051
type consumer struct {
@@ -54,7 +55,7 @@ type consumer struct {
5455
// QueueHelper enables injection for testnig.
5556
type QueueHelper interface {
5657
Init(ctx context.Context, metadata bindings.Metadata) (*storageQueuesMetadata, error)
57-
Write(ctx context.Context, data []byte, ttl *time.Duration) error
58+
Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error
5859
Read(ctx context.Context, consumer *consumer) error
5960
Close() error
6061
}
@@ -129,7 +130,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s
129130
return m, nil
130131
}
131132

132-
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
133+
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
133134
var ttlSeconds *int32
134135
if ttl != nil {
135136
ttlSeconds = ptr.Of(int32(ttl.Seconds()))
@@ -146,9 +147,16 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur
146147
s = base64.StdEncoding.EncodeToString([]byte(s))
147148
}
148149

149-
_, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{
150+
options := &azqueue.EnqueueMessageOptions{
150151
TimeToLive: ttlSeconds,
151-
})
152+
}
153+
154+
// Add the initial visibility delay if specified
155+
if initialVisibilityDelay != nil {
156+
options.VisibilityTimeout = ptr.Of(int32(initialVisibilityDelay.Seconds()))
157+
}
158+
159+
_, err = d.queueClient.EnqueueMessage(ctx, s, options)
152160

153161
return err
154162
}
@@ -248,15 +256,16 @@ type AzureStorageQueues struct {
248256
}
249257

250258
type storageQueuesMetadata struct {
251-
QueueName string
252-
QueueEndpoint string
253-
AccountName string
254-
AccountKey string
255-
DecodeBase64 bool
256-
EncodeBase64 bool
257-
PollingInterval time.Duration `mapstructure:"pollingInterval"`
258-
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
259-
VisibilityTimeout *time.Duration
259+
QueueName string
260+
QueueEndpoint string
261+
AccountName string
262+
AccountKey string
263+
DecodeBase64 bool
264+
EncodeBase64 bool
265+
PollingInterval time.Duration `mapstructure:"pollingInterval"`
266+
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
267+
VisibilityTimeout *time.Duration
268+
InitialVisibilityDelay *time.Duration `mapstructure:"initialVisibilityDelay"`
260269
}
261270

262271
func (m *storageQueuesMetadata) GetQueueURL(azEnvSettings azauth.EnvironmentSettings) string {
@@ -350,7 +359,17 @@ func (a *AzureStorageQueues) Invoke(ctx context.Context, req *bindings.InvokeReq
350359
ttlToUse = &ttl
351360
}
352361

353-
err = a.helper.Write(ctx, req.Data, ttlToUse)
362+
// Get the initial visibility delay from request metadata, or use the component's metadata
363+
initialVisibilityDelayToUse := a.metadata.InitialVisibilityDelay
364+
if val, ok := req.Metadata["initialVisibilityDelay"]; ok && val != "" {
365+
duration, parseErr := time.ParseDuration(val)
366+
if parseErr != nil {
367+
return nil, fmt.Errorf("invalid value for initialVisibilityDelay: %w", parseErr)
368+
}
369+
initialVisibilityDelayToUse = &duration
370+
}
371+
372+
err = a.helper.Write(ctx, req.Data, ttlToUse, initialVisibilityDelayToUse)
354373
if err != nil {
355374
return nil, err
356375
}

bindings/azure/storagequeues/storagequeues_test.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func (m *MockHelper) Init(ctx context.Context, metadata bindings.Metadata) (*sto
4545
return m.metadata, err
4646
}
4747

48-
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
48+
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
4949
m.messages <- data
50-
retvals := m.Called(data, ttl)
50+
retvals := m.Called(data, ttl, initialVisibilityDelay)
5151
return retvals.Error(0)
5252
}
5353

@@ -89,6 +89,8 @@ func TestWriteQueue(t *testing.T) {
8989
mm := new(MockHelper)
9090
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
9191
return in == nil
92+
}), mock.MatchedBy(func(in *time.Duration) bool {
93+
return in == nil
9294
})).Return(nil)
9395

9496
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -111,6 +113,8 @@ func TestWriteWithTTLInQueue(t *testing.T) {
111113
mm := new(MockHelper)
112114
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
113115
return in != nil && *in == time.Second
116+
}), mock.MatchedBy(func(in *time.Duration) bool {
117+
return in == nil
114118
})).Return(nil)
115119

116120
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -133,6 +137,8 @@ func TestWriteWithTTLInWrite(t *testing.T) {
133137
mm := new(MockHelper)
134138
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
135139
return in != nil && *in == time.Second
140+
}), mock.MatchedBy(func(in *time.Duration) bool {
141+
return in == nil
136142
})).Return(nil)
137143

138144
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -174,7 +180,7 @@ func TestWriteWithTTLInWrite(t *testing.T) {
174180

175181
func TestReadQueue(t *testing.T) {
176182
mm := new(MockHelper)
177-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
183+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
178184
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
179185
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
180186

@@ -215,7 +221,7 @@ func TestReadQueue(t *testing.T) {
215221

216222
func TestReadQueueDecode(t *testing.T) {
217223
mm := new(MockHelper)
218-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
224+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
219225
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
220226

221227
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -286,7 +292,7 @@ func TestReadQueueDecode(t *testing.T) {
286292
*/
287293
func TestReadQueueNoMessage(t *testing.T) {
288294
mm := new(MockHelper)
289-
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
295+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
290296
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
291297

292298
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
@@ -316,70 +322,76 @@ func TestReadQueueNoMessage(t *testing.T) {
316322

317323
func TestParseMetadata(t *testing.T) {
318324
oneSecondDuration := time.Second
319-
320325
testCases := []struct {
321326
name string
322327
properties map[string]string
323328
// Account key is parsed in azauth
324329
// expectedAccountKey string
325-
expectedQueueName string
326-
expectedQueueEndpointURL string
327-
expectedPollingInterval time.Duration
328-
expectedTTL *time.Duration
329-
expectedVisibilityTimeout *time.Duration
330+
expectedQueueName string
331+
expectedQueueEndpointURL string
332+
expectedPollingInterval time.Duration
333+
expectedTTL *time.Duration
334+
expectedVisibilityTimeout *time.Duration
335+
expectedInitialVisibilityDelay *time.Duration
330336
}{
331337
{
332338
name: "Account and key",
333339
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1"},
334340
// expectedAccountKey: "myKey",
335-
expectedQueueName: "queue1",
336-
expectedQueueEndpointURL: "",
337-
expectedPollingInterval: defaultPollingInterval,
338-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
341+
expectedQueueName: "queue1",
342+
expectedQueueEndpointURL: "",
343+
expectedPollingInterval: defaultPollingInterval,
344+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
345+
expectedInitialVisibilityDelay: nil,
339346
},
340347
{
341348
name: "Accout, key, and endpoint",
342349
properties: map[string]string{"accountKey": "myKey", "queueName": "queue1", "storageAccount": "someAccount", "queueEndpointUrl": "https://foo.example.com:10001"},
343350
// expectedAccountKey: "myKey",
344-
expectedQueueName: "queue1",
345-
expectedQueueEndpointURL: "https://foo.example.com:10001",
346-
expectedPollingInterval: defaultPollingInterval,
347-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
351+
expectedQueueName: "queue1",
352+
expectedQueueEndpointURL: "https://foo.example.com:10001",
353+
expectedPollingInterval: defaultPollingInterval,
354+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
355+
expectedInitialVisibilityDelay: nil,
348356
},
349357
{
350358
name: "Empty TTL",
351359
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: ""},
352360
// expectedAccountKey: "myKey",
353-
expectedQueueName: "queue1",
354-
expectedQueueEndpointURL: "",
355-
expectedPollingInterval: defaultPollingInterval,
356-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
361+
expectedQueueName: "queue1",
362+
expectedQueueEndpointURL: "",
363+
expectedPollingInterval: defaultPollingInterval,
364+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
365+
expectedInitialVisibilityDelay: nil,
357366
},
358367
{
359368
name: "With TTL",
360369
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"},
361370
// expectedAccountKey: "myKey",
362-
expectedQueueName: "queue1",
363-
expectedTTL: &oneSecondDuration,
364-
expectedQueueEndpointURL: "",
365-
expectedPollingInterval: defaultPollingInterval,
366-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
371+
expectedQueueName: "queue1",
372+
expectedTTL: &oneSecondDuration,
373+
expectedQueueEndpointURL: "",
374+
expectedPollingInterval: defaultPollingInterval,
375+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
376+
expectedInitialVisibilityDelay: nil,
367377
},
368378
{
369-
name: "With visibility timeout",
370-
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
371-
expectedQueueName: "queue1",
372-
expectedPollingInterval: defaultPollingInterval,
373-
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
379+
name: "With visibility timeout",
380+
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
381+
expectedQueueName: "queue1",
382+
expectedPollingInterval: defaultPollingInterval,
383+
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
384+
expectedInitialVisibilityDelay: nil,
374385
},
375386
{
376387
name: "With polling interval",
377388
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "pollingInterval": "2s"},
378389
// expectedAccountKey: "myKey",
379-
expectedQueueName: "queue1",
380-
expectedQueueEndpointURL: "",
381-
expectedPollingInterval: 2 * time.Second,
382-
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
390+
expectedQueueName: "queue1",
391+
expectedQueueEndpointURL: "",
392+
expectedPollingInterval: 2 * time.Second,
393+
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
394+
expectedInitialVisibilityDelay: nil,
383395
},
384396
}
385397

@@ -448,3 +460,31 @@ func TestParseMetadataWithInvalidTTL(t *testing.T) {
448460
})
449461
}
450462
}
463+
464+
func TestWriteWithInitialVisibilityDelay(t *testing.T) {
465+
mm := new(MockHelper)
466+
expectedDelay := 5 * time.Second
467+
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
468+
return in == nil
469+
}), mock.MatchedBy(func(in *time.Duration) bool {
470+
return in != nil && *in == expectedDelay
471+
})).Return(nil)
472+
473+
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
474+
475+
m := bindings.Metadata{}
476+
m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"}
477+
478+
err := a.Init(t.Context(), m)
479+
require.NoError(t, err)
480+
481+
r := bindings.InvokeRequest{
482+
Data: []byte("This is my message"),
483+
Metadata: map[string]string{"initialVisibilityDelay": "5s"},
484+
}
485+
486+
_, err = a.Invoke(t.Context(), &r)
487+
488+
require.NoError(t, err)
489+
require.NoError(t, a.Close())
490+
}

0 commit comments

Comments
 (0)