Skip to content

Commit 1a6a75a

Browse files
authored
Enable in order processing of eventhubs messages (#3605)
Signed-off-by: yaron2 <[email protected]>
1 parent a00a853 commit 1a6a75a

File tree

5 files changed

+45
-13
lines changed

5 files changed

+45
-13
lines changed

bindings/azure/eventhubs/metadata.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,15 @@ builtinAuthenticationProfiles:
5555
default: "false"
5656
example: "false"
5757
description: |
58+
5859
Allow management of the Event Hub namespace and storage account.
60+
- name: enableInOrderMessageDelivery
61+
type: bool
62+
required: false
63+
default: "false"
64+
example: "false"
65+
description: |
66+
Enable in order processing of messages within a partition.
5967
- name: resourceGroupName
6068
type: string
6169
required: false

common/component/azure/eventhubs/eventhubs.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,11 @@ func (aeh *AzureEventHubs) processEvents(subscribeCtx context.Context, partition
393393

394394
if len(events) != 0 {
395395
// Handle received message
396-
go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
396+
if aeh.metadata.EnableInOrderMessageDelivery {
397+
aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
398+
} else {
399+
go aeh.handleAsync(subscribeCtx, config.Topic, events, config.Handler)
400+
}
397401

398402
// Checkpointing disabled for CheckPointFrequencyPerPartition == 0
399403
if config.CheckPointFrequencyPerPartition > 0 {

common/component/azure/eventhubs/eventhubs_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ func TestParseEventHubsMetadata(t *testing.T) {
7272
require.Error(t, err)
7373
require.ErrorContains(t, err, "one of connectionString or eventHubNamespace is required")
7474
})
75+
76+
t.Run("test in order delivery", func(t *testing.T) {
77+
metadata := map[string]string{
78+
"enableInOrderMessageDelivery": "true",
79+
"connectionString": "fake",
80+
}
81+
82+
m, err := parseEventHubsMetadata(metadata, false, testLogger)
83+
84+
require.NoError(t, err)
85+
require.True(t, m.EnableInOrderMessageDelivery)
86+
})
7587
}
7688

7789
func TestConstructConnectionStringFromTopic(t *testing.T) {

common/component/azure/eventhubs/metadata.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,19 @@ import (
2626
)
2727

2828
type AzureEventHubsMetadata struct {
29-
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
30-
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
31-
ConsumerID string `json:"consumerID" mapstructure:"consumerID"`
32-
StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"`
33-
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
34-
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
35-
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
36-
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
37-
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
38-
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
39-
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
40-
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
29+
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`
30+
EventHubNamespace string `json:"eventHubNamespace" mapstructure:"eventHubNamespace"`
31+
ConsumerID string `json:"consumerID" mapstructure:"consumerID"`
32+
StorageConnectionString string `json:"storageConnectionString" mapstructure:"storageConnectionString"`
33+
StorageAccountName string `json:"storageAccountName" mapstructure:"storageAccountName"`
34+
StorageAccountKey string `json:"storageAccountKey" mapstructure:"storageAccountKey"`
35+
StorageContainerName string `json:"storageContainerName" mapstructure:"storageContainerName"`
36+
EnableEntityManagement bool `json:"enableEntityManagement,string" mapstructure:"enableEntityManagement"`
37+
MessageRetentionInDays int32 `json:"messageRetentionInDays,string" mapstructure:"messageRetentionInDays"`
38+
PartitionCount int32 `json:"partitionCount,string" mapstructure:"partitionCount"`
39+
SubscriptionID string `json:"subscriptionID" mapstructure:"subscriptionID"`
40+
ResourceGroupName string `json:"resourceGroupName" mapstructure:"resourceGroupName"`
41+
EnableInOrderMessageDelivery bool `json:"enableInOrderMessageDelivery,string" mapstructure:"enableInOrderMessageDelivery"`
4142

4243
// Binding only
4344
EventHub string `json:"eventHub" mapstructure:"eventHub" mdonly:"bindings"`

pubsub/azure/eventhubs/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ builtinAuthenticationProfiles:
3535
example: "false"
3636
description: |
3737
Allow management of the Event Hub namespace and storage account.
38+
- name: enableInOrderMessageDelivery
39+
type: bool
40+
required: false
41+
default: "false"
42+
example: "false"
43+
description: |
44+
Enable in order processing of messages within a partition.
3845
3946
# The following four properties are needed only if enableEntityManagement is set to true
4047
- name: resourceGroupName

0 commit comments

Comments
 (0)