Skip to content

Commit 4c76208

Browse files
authored
[Event Hubs] Buffered Producer Partition Assignment (Azure#25081)
* [Event Hubs] Buffered Producer Partition Assignment The focus of these changes is to implement the partition assignment operations for the `EventHubBufferedProducerClient`. Partition key hashing behavior was ported directly from the code used by the Event Hubs service and left as close to its original implementation as possible.
1 parent 2e5424e commit 4c76208

File tree

4 files changed

+693
-71
lines changed

4 files changed

+693
-71
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Text;
7+
using System.Threading;
8+
9+
namespace Azure.Messaging.EventHubs.Core
10+
{
11+
/// <summary>
12+
/// Allows events to be resolved to partitions using common patterns such as
13+
/// round-robin assignment and hashing of partitions keys.
14+
/// </summary>
15+
///
16+
internal class PartitionResolver
17+
{
18+
/// <summary>The index to use for automatic partition assignment.</summary>
19+
private int _partitionAssignmentIndex = -1;
20+
21+
/// <summary>
22+
/// Assigns a partition using a round-robin approach.
23+
/// </summary>
24+
///
25+
/// <param name="partitions">The set of available partitions.</param>
26+
///
27+
/// <returns>The zero-based index of the selected partition from the available set.</returns>
28+
///
29+
public virtual string AssignRoundRobin(string[] partitions)
30+
{
31+
// At some point, overflow is possible; ensure that the increment is
32+
// unchecked to allow rollover without an exception.
33+
34+
unchecked
35+
{
36+
var index = Interlocked.Increment(ref _partitionAssignmentIndex);
37+
38+
// If the increment rolls over to a negative value, attempt to reset to 0.
39+
//
40+
// If the exchange is successful, the return from the call will match the
41+
// original overflow value; in this case, the local index can safely be set to 0.
42+
//
43+
// If the call returns another value, a different caller has reset the assignment
44+
// index, and another increment is needed to avoid duplication for the local index.
45+
//
46+
// It is possible (though incredibly unlikely) that the assignment index will overflow
47+
// negative again after the exchange/increment. To guard against that scenario, the
48+
// reset is performed in a loop.
49+
//
50+
// Rolling over can create a slightly unfair distribution due to the sequence changing,
51+
// but avoids corner-case errors with negative values not aligning to a valid index range.
52+
53+
while (index < 0)
54+
{
55+
var original = index;
56+
57+
index = Interlocked.CompareExchange(ref _partitionAssignmentIndex, 0, index);
58+
index = (index == original) ? 0 : Interlocked.Increment(ref _partitionAssignmentIndex);
59+
}
60+
61+
return partitions[(index % partitions.Length)];
62+
}
63+
}
64+
65+
/// <summary>
66+
/// Assigns a partition using a hash-based approach based on the provided
67+
/// <paramref name="partitionKey" />.
68+
/// </summary>
69+
///
70+
/// <param name="partitionKey">The partition key to use as the basis for partition assignment.</param>
71+
/// <param name="partitions">The set of available partitions.</param>
72+
///
73+
/// <returns>The zero-based index of the selected partition from the available set.</returns>
74+
///
75+
public virtual string AssignForPartitionKey(string partitionKey,
76+
string[] partitions)
77+
{
78+
var hashValue = GenerateHashCode(partitionKey);
79+
return partitions[Math.Abs(hashValue % partitions.Length)];
80+
}
81+
82+
/// <summary>
83+
/// Generates a hash code for a partition key using Jenkins' lookup3 algorithm.
84+
/// </summary>
85+
///
86+
/// <param name="partitionKey">The partition key to generate a hash code for.</param>
87+
///
88+
/// <returns>The generated hash code.</returns>
89+
///
90+
/// <remarks>
91+
/// This implementation is a direct port of the Event Hubs service code; it is intended to match
92+
/// the gateway hashing algorithm as closely as possible and should not be adjusted without careful
93+
/// consideration.
94+
/// </remarks>
95+
///
96+
private static short GenerateHashCode(string partitionKey)
97+
{
98+
if (partitionKey == null)
99+
{
100+
return 0;
101+
}
102+
103+
ComputeHash(Encoding.UTF8.GetBytes(partitionKey), seed1: 0, seed2: 0, out uint hash1, out uint hash2);
104+
return (short)(hash1 ^ hash2);
105+
}
106+
107+
/// <summary>
108+
/// Computes a hash value using Jenkins' lookup3 algorithm.
109+
/// </summary>
110+
///
111+
/// <param name="data">The data to base the hash on.</param>
112+
/// <param name="seed1">A seed value for the first hash.</param>
113+
/// <param name="seed2">A seed value for the second hash.</param>
114+
/// <param name="hash1">The first computed hash for the <paramref name="data" />.</param>
115+
/// <param name="hash2">The second computed hash for the <paramref name="data" />.</param>
116+
///
117+
/// <remarks>
118+
/// This implementation is a direct port of the Event Hubs service code; it is intended to match
119+
/// the gateway hashing algorithm as closely as possible and should not be adjusted without careful
120+
/// consideration.
121+
/// </remarks>
122+
///
123+
private static void ComputeHash(byte[] data,
124+
uint seed1,
125+
uint seed2,
126+
out uint hash1,
127+
out uint hash2)
128+
{
129+
uint a, b, c;
130+
131+
a = b = c = (uint)(0xdeadbeef + data.Length + seed1);
132+
c += seed2;
133+
134+
int index = 0, size = data.Length;
135+
while (size > 12)
136+
{
137+
a += BitConverter.ToUInt32(data, index);
138+
b += BitConverter.ToUInt32(data, index + 4);
139+
c += BitConverter.ToUInt32(data, index + 8);
140+
141+
a -= c;
142+
a ^= (c << 4) | (c >> 28);
143+
c += b;
144+
145+
b -= a;
146+
b ^= (a << 6) | (a >> 26);
147+
a += c;
148+
149+
c -= b;
150+
c ^= (b << 8) | (b >> 24);
151+
b += a;
152+
153+
a -= c;
154+
a ^= (c << 16) | (c >> 16);
155+
c += b;
156+
157+
b -= a;
158+
b ^= (a << 19) | (a >> 13);
159+
a += c;
160+
161+
c -= b;
162+
c ^= (b << 4) | (b >> 28);
163+
b += a;
164+
165+
index += 12;
166+
size -= 12;
167+
}
168+
169+
switch (size)
170+
{
171+
case 12:
172+
a += BitConverter.ToUInt32(data, index);
173+
b += BitConverter.ToUInt32(data, index + 4);
174+
c += BitConverter.ToUInt32(data, index + 8);
175+
break;
176+
case 11:
177+
c += ((uint)data[index + 10]) << 16;
178+
goto case 10;
179+
case 10:
180+
c += ((uint)data[index + 9]) << 8;
181+
goto case 9;
182+
case 9:
183+
c += (uint)data[index + 8];
184+
goto case 8;
185+
case 8:
186+
b += BitConverter.ToUInt32(data, index + 4);
187+
a += BitConverter.ToUInt32(data, index);
188+
break;
189+
case 7:
190+
b += ((uint)data[index + 6]) << 16;
191+
goto case 6;
192+
case 6:
193+
b += ((uint)data[index + 5]) << 8;
194+
goto case 5;
195+
case 5:
196+
b += (uint)data[index + 4];
197+
goto case 4;
198+
case 4:
199+
a += BitConverter.ToUInt32(data, index);
200+
break;
201+
case 3:
202+
a += ((uint)data[index + 2]) << 16;
203+
goto case 2;
204+
case 2:
205+
a += ((uint)data[index + 1]) << 8;
206+
goto case 1;
207+
case 1:
208+
a += (uint)data[index];
209+
break;
210+
case 0:
211+
hash1 = c;
212+
hash2 = b;
213+
return;
214+
}
215+
216+
c ^= b;
217+
c -= (b << 14) | (b >> 18);
218+
219+
a ^= c;
220+
a -= (c << 11) | (c >> 21);
221+
222+
b ^= a;
223+
b -= (a << 25) | (a >> 7);
224+
225+
c ^= b;
226+
c -= (b << 16) | (b >> 16);
227+
228+
a ^= c;
229+
a -= (c << 4) | (c >> 28);
230+
231+
b ^= a;
232+
b -= (a << 14) | (a >> 18);
233+
234+
c ^= b;
235+
c -= (b << 24) | (b >> 8);
236+
237+
hash1 = c;
238+
hash2 = b;
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)