Skip to content

Commit f70571a

Browse files
authored
[Event Hubs Trigger] Work around service contract (Azure#50645)
* [Event Hubs Trigger] Work around service contract The focus of these changes is to work around the service contract for `offset` which is a string that previously was always numeric. In support of the GeoDR feature, the service no longer returns a numeric offset in all scenarios, sometimes returning a format that cannot be parsed as a number. To avoid applications that made assumptions about the format, the `Offset` binding contracts for the trigger were updated to always return a numeric value as the string. When the service returns an offset that cannot be parsed as a number, the value `-1` is substituted. If used for readers, this would possibly result in reading at the beginning of the stream, causing duplicate processing of events but will ensure there is no data loss. Because the `Offset` binding is not longer guaranteed to represent the actual offset, a new `OffsetString` binding was introduced to surface the actual offset value.
1 parent d8315dc commit f70571a

File tree

4 files changed

+76
-16
lines changed

4 files changed

+76
-16
lines changed

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
# Release History
22

3-
## 6.5.2 (2025-06-12)
3+
## 6.5.2 (2025-06-16)
44

55
### Bugs Fixed
66

77
- Fixed a bug where the data types of broker-owned properties were being adjusted when an event was read by the client, causing the underlying AMQP data to be mutated. This resulted in binary changes when the AMQP message was serialized and unintentionally altered the service contract. Going forward, the original data types will be preserved on the AMQP representation of the message and type normalization only applied to the .NET `EventData` projection.
8-
98
### Other Changes
109

1110
- Updated the `Microsoft.Azure.Amqp` dependency to 2.7.0, which contains several bug fixes and adds support for AOT. _(see: [commits](https://github.com/Azure/azure-amqp/commits/hotfix/))_
11+
12+
- Added a compatibility hack to the binding data associated with the trigger to preserve the previous service behavior of `Offset` being a valid numeric value. As this data is not local to the trigger context and is surfaced upstream, the change in service behavior for GeoDR namespaces caused issues with some custom language workers making invalid assumptions about the string-based offset format. To work around this, the `Offset` is being populated with `OffsetString` if it can be parsed as a number. In the case that `OffsetString` is not a valid numeric value, it will default to `-1` which would position readers at the beginning of the stream, causing duplicates but preventing data loss. A key for `OffsetString` was added that reflects the actual offset returned by the service in all scenarios.
1213

1314
## 6.5.1 (2025-04-09)
1415

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public Dictionary<string, Type> GetBindingContract(bool isSingleDispatch = true)
5454

5555
AddBindingContractMember(contract, "PartitionKey", typeof(string), isSingleDispatch);
5656
AddBindingContractMember(contract, "Offset", typeof(string), isSingleDispatch);
57+
AddBindingContractMember(contract, "OffsetString", typeof(string), isSingleDispatch);
5758
AddBindingContractMember(contract, "SequenceNumber", typeof(long), isSingleDispatch);
5859
AddBindingContractMember(contract, "EnqueuedTimeUtc", typeof(DateTime), isSingleDispatch);
5960
AddBindingContractMember(contract, "Properties", typeof(IDictionary<string, object>), isSingleDispatch);
@@ -96,25 +97,36 @@ public Dictionary<string, object> GetBindingData(EventHubTriggerInput value)
9697

9798
internal static void AddBindingData(Dictionary<string, object> bindingData, EventData[] events)
9899
{
99-
int length = events.Length;
100+
var length = events.Length;
100101
var partitionKeys = new string[length];
101102
var offsets = new string[length];
103+
var offsetStrings = new string[length];
102104
var sequenceNumbers = new long[length];
103105
var enqueuedTimesUtc = new DateTime[length];
104106
var properties = new IDictionary<string, object>[length];
105107
var systemProperties = new IDictionary<string, object>[length];
106108

107109
SafeAddValue(() => bindingData.Add("PartitionKeyArray", partitionKeys));
108110
SafeAddValue(() => bindingData.Add("OffsetArray", offsets));
111+
SafeAddValue(() => bindingData.Add("OffsetStringArray", offsetStrings));
109112
SafeAddValue(() => bindingData.Add("SequenceNumberArray", sequenceNumbers));
110113
SafeAddValue(() => bindingData.Add("EnqueuedTimeUtcArray", enqueuedTimesUtc));
111114
SafeAddValue(() => bindingData.Add("PropertiesArray", properties));
112115
SafeAddValue(() => bindingData.Add("SystemPropertiesArray", systemProperties));
113116

114117
for (int i = 0; i < events.Length; i++)
115118
{
119+
if (!long.TryParse(events[i].OffsetString, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offset))
120+
{
121+
// Default to "beginning of stream" if parsing fails. This will result in duplicates,
122+
// but ensures no data loss.
123+
124+
offset = -1;
125+
}
126+
116127
partitionKeys[i] = events[i].PartitionKey;
117-
offsets[i] = events[i].OffsetString;
128+
offsets[i] = offset.ToString(CultureInfo.InvariantCulture);
129+
offsetStrings[i] = events[i].OffsetString;
118130
sequenceNumbers[i] = events[i].SequenceNumber;
119131
enqueuedTimesUtc[i] = events[i].EnqueuedTime.DateTime;
120132
properties[i] = events[i].Properties;
@@ -124,8 +136,17 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, Even
124136

125137
private static void AddBindingData(Dictionary<string, object> bindingData, EventData eventData)
126138
{
139+
if (!long.TryParse(eventData.OffsetString, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offset))
140+
{
141+
// Default to "beginning of stream" if parsing fails. This will result in duplicates,
142+
// but ensures no data loss.
143+
144+
offset = -1;
145+
}
146+
127147
SafeAddValue(() => bindingData.Add("PartitionKey", eventData.PartitionKey));
128-
SafeAddValue(() => bindingData.Add("Offset", eventData.OffsetString));
148+
SafeAddValue(() => bindingData.Add("Offset", offset.ToString(CultureInfo.InvariantCulture)));
149+
SafeAddValue(() => bindingData.Add("OffsetString", eventData.OffsetString));
129150
SafeAddValue(() => bindingData.Add("SequenceNumber", eventData.SequenceNumber));
130151
SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime.DateTime));
131152
SafeAddValue(() => bindingData.Add("Properties", eventData.Properties));
@@ -154,8 +175,18 @@ private static IDictionary<string, object> GetSystemPropertiesForBinding(EventDa
154175
}
155176

156177
// Following is needed to maintain structure of bindingdata: https://github.com/Azure/azure-webjobs-sdk/pull/1849
178+
179+
if (!long.TryParse(eventData.OffsetString, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offset))
180+
{
181+
// Default to "beginning of stream" if parsing fails. This will result in duplicates,
182+
// but ensures no data loss.
183+
184+
offset = -1;
185+
}
186+
157187
modifiedDictionary["SequenceNumber"] = eventData.SequenceNumber;
158-
modifiedDictionary["Offset"] = eventData.OffsetString;
188+
modifiedDictionary["Offset"] = offset;
189+
modifiedDictionary["OffsetString"] = eventData.OffsetString;
159190
modifiedDictionary["PartitionKey"] = eventData.PartitionKey;
160191
modifiedDictionary["EnqueuedTimeUtc"] = eventData.EnqueuedTime.DateTime;
161192
return modifiedDictionary;

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,19 @@ public Dictionary<string, string> GetTriggerDetails(EventProcessorPartition cont
6363
return new Dictionary<string, string>();
6464
}
6565

66-
string offset, enqueueTimeUtc, sequenceNumber;
66+
string offset, offsetString, enqueueTimeUtc, sequenceNumber;
6767
if (IsSingleDispatch)
6868
{
69-
offset = Events[0].OffsetString;
69+
if (!long.TryParse(Events[0].OffsetString, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offsetLong))
70+
{
71+
// Default to "beginning of stream" if parsing fails. This will result in duplicates,
72+
// but ensures no data loss.
73+
74+
offsetLong = -1;
75+
}
76+
77+
offset = offsetLong.ToString(CultureInfo.InvariantCulture);
78+
offsetString = Events[0].OffsetString;
7079
enqueueTimeUtc = Events[0].EnqueuedTime.ToString("o", CultureInfo.InvariantCulture);
7180
sequenceNumber = Events[0].SequenceNumber.ToString(CultureInfo.InvariantCulture);
7281
}
@@ -76,18 +85,26 @@ public Dictionary<string, string> GetTriggerDetails(EventProcessorPartition cont
7685
EventData last = Events[Events.Length - 1];
7786

7887
offset = $"{first.OffsetString}-{last.OffsetString}";
88+
offsetString = $"{first.OffsetString}-{last.OffsetString}";
7989
enqueueTimeUtc = $"{first.EnqueuedTime.ToString("o", CultureInfo.InvariantCulture)}-{last.EnqueuedTime.ToString("o", CultureInfo.InvariantCulture)}";
8090
sequenceNumber = $"{first.SequenceNumber}-{last.SequenceNumber}";
8191
}
8292

8393
return new Dictionary<string, string>()
8494
{
8595
{ "PartitionId", context.PartitionId },
86-
{ "Offset", offset },
96+
{ "OffsetString", offsetString },
8797
{ "EnqueueTimeUtc", enqueueTimeUtc },
8898
{ "SequenceNumber", sequenceNumber },
8999
{ "Count", Events.Length.ToString(CultureInfo.InvariantCulture)},
90100

101+
// Preserve a numeric offset for compatibility with existing code. It is immportant
102+
// to note that this does not conform to the Event Hubs service contract and may not
103+
// reflect the actual offset of the event in the stream. In the case that the
104+
// offset is not a valid numeric value, it will default to -1 which would position
105+
// readers at the beginning of the stream, causing duplicates but preventing data loss.
106+
{ "Offset", offset },
107+
91108
// Preserve a misspelling that existed in the original code, as
92109
// there may be applications relying on this.
93110
{ "PartionId", context.PartitionId }

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Globalization;
67
using System.Text;
78
using Azure.Messaging.EventHubs;
89
using Azure.Messaging.EventHubs.Consumer;
@@ -26,10 +27,11 @@ public void GetStaticBindingContract_ReturnsExpectedValue()
2627
var strategy = new EventHubTriggerBindingStrategy();
2728
var contract = strategy.GetBindingContract();
2829

29-
Assert.AreEqual(8, contract.Count);
30+
Assert.AreEqual(9, contract.Count);
3031
Assert.AreEqual(typeof(TriggerPartitionContext), contract["TriggerPartitionContext"]);
3132
Assert.AreEqual(typeof(PartitionContext), contract["PartitionContext"]);
3233
Assert.AreEqual(typeof(string), contract["Offset"]);
34+
Assert.AreEqual(typeof(string), contract["OffsetString"]);
3335
Assert.AreEqual(typeof(long), contract["SequenceNumber"]);
3436
Assert.AreEqual(typeof(DateTime), contract["EnqueuedTimeUtc"]);
3537
Assert.AreEqual(typeof(IDictionary<string, object>), contract["Properties"]);
@@ -42,10 +44,11 @@ public void GetBindingContract_SingleDispatch_ReturnsExpectedValue()
4244
var strategy = new EventHubTriggerBindingStrategy();
4345
var contract = strategy.GetBindingContract(true);
4446

45-
Assert.AreEqual(8, contract.Count);
47+
Assert.AreEqual(9, contract.Count);
4648
Assert.AreEqual(typeof(TriggerPartitionContext), contract["TriggerPartitionContext"]);
4749
Assert.AreEqual(typeof(PartitionContext), contract["PartitionContext"]);
4850
Assert.AreEqual(typeof(string), contract["Offset"]);
51+
Assert.AreEqual(typeof(string), contract["OffsetString"]);
4952
Assert.AreEqual(typeof(long), contract["SequenceNumber"]);
5053
Assert.AreEqual(typeof(DateTime), contract["EnqueuedTimeUtc"]);
5154
Assert.AreEqual(typeof(IDictionary<string, object>), contract["Properties"]);
@@ -58,11 +61,12 @@ public void GetBindingContract_MultipleDispatch_ReturnsExpectedValue()
5861
var strategy = new EventHubTriggerBindingStrategy();
5962
var contract = strategy.GetBindingContract(false);
6063

61-
Assert.AreEqual(8, contract.Count);
64+
Assert.AreEqual(9, contract.Count);
6265
Assert.AreEqual(typeof(TriggerPartitionContext), contract["TriggerPartitionContext"]);
6366
Assert.AreEqual(typeof(PartitionContext), contract["PartitionContext"]);
6467
Assert.AreEqual(typeof(string[]), contract["PartitionKeyArray"]);
6568
Assert.AreEqual(typeof(string[]), contract["OffsetArray"]);
69+
Assert.AreEqual(typeof(string[]), contract["OffsetStringArray"]);
6670
Assert.AreEqual(typeof(long[]), contract["SequenceNumberArray"]);
6771
Assert.AreEqual(typeof(DateTime[]), contract["EnqueuedTimeUtcArray"]);
6872
Assert.AreEqual(typeof(IDictionary<string, object>[]), contract["PropertiesArray"]);
@@ -80,18 +84,24 @@ public void GetBindingData_SingleDispatch_ReturnsExpectedValue()
8084
var strategy = new EventHubTriggerBindingStrategy();
8185
var bindingData = strategy.GetBindingData(input);
8286

83-
Assert.AreEqual(8, bindingData.Count);
87+
if (!long.TryParse(evt.OffsetString, NumberStyles.Integer, CultureInfo.InvariantCulture, out long offsetLong))
88+
{
89+
offsetLong = -1;
90+
}
91+
92+
Assert.AreEqual(9, bindingData.Count);
8493
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["TriggerPartitionContext"]);
8594
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["PartitionContext"]);
8695
Assert.AreEqual(evt.PartitionKey, bindingData["PartitionKey"]);
87-
Assert.AreEqual(evt.OffsetString, bindingData["Offset"]);
96+
Assert.AreEqual(offsetLong.ToString(CultureInfo.InvariantCulture), bindingData["Offset"]);
97+
Assert.AreEqual(evt.OffsetString, bindingData["OffsetString"]);
8898
Assert.AreEqual(evt.SequenceNumber, bindingData["SequenceNumber"]);
8999
Assert.AreEqual(evt.EnqueuedTime.DateTime, bindingData["EnqueuedTimeUtc"]);
90100
Assert.AreSame(evt.Properties, bindingData["Properties"]);
91101
IDictionary<string, object> bindingDataSysProps = bindingData["SystemProperties"] as Dictionary<string, object>;
92102
Assert.NotNull(bindingDataSysProps);
93103
Assert.AreEqual(bindingDataSysProps["PartitionKey"], bindingData["PartitionKey"]);
94-
Assert.AreEqual(bindingDataSysProps["Offset"], bindingData["Offset"]);
104+
Assert.AreEqual(offsetLong.ToString(CultureInfo.InvariantCulture), bindingData["Offset"]);
95105
Assert.AreEqual(bindingDataSysProps["SequenceNumber"], bindingData["SequenceNumber"]);
96106
Assert.AreEqual(bindingDataSysProps["EnqueuedTimeUtc"], bindingData["EnqueuedTimeUtc"]);
97107
Assert.AreEqual(bindingDataSysProps["iothub-connection-device-id"], "testDeviceId");
@@ -123,13 +133,14 @@ public void GetBindingData_MultipleDispatch_ReturnsExpectedValue()
123133
var strategy = new EventHubTriggerBindingStrategy();
124134
var bindingData = strategy.GetBindingData(input);
125135

126-
Assert.AreEqual(8, bindingData.Count);
136+
Assert.AreEqual(9, bindingData.Count);
127137
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["TriggerPartitionContext"]);
128138
Assert.AreSame(input.ProcessorPartition.PartitionContext, bindingData["PartitionContext"]);
129139

130140
// verify an array was created for each binding data type
131141
Assert.AreEqual(events.Length, ((string[])bindingData["PartitionKeyArray"]).Length);
132142
Assert.AreEqual(events.Length, ((string[])bindingData["OffsetArray"]).Length);
143+
Assert.AreEqual(events.Length, ((string[])bindingData["OffsetStringArray"]).Length);
133144
Assert.AreEqual(events.Length, ((long[])bindingData["SequenceNumberArray"]).Length);
134145
Assert.AreEqual(events.Length, ((DateTime[])bindingData["EnqueuedTimeUtcArray"]).Length);
135146
Assert.AreEqual(events.Length, ((IDictionary<string, object>[])bindingData["PropertiesArray"]).Length);

0 commit comments

Comments
 (0)