Skip to content

Commit cdbbfc5

Browse files
authored
Add support for super stream key routing (#270)
* Add support for super stream key routing --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 0f5cd64 commit cdbbfc5

File tree

12 files changed

+533
-14
lines changed

12 files changed

+533
-14
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,12 @@ public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
341341
new PartitionsQueryRequest(corr, superStream)).ConfigureAwait(false);
342342
}
343343

344+
public async Task<RouteQueryResponse> QueryRoute(string superStream, string routingKey)
345+
{
346+
return await Request<RouteQueryRequest, RouteQueryResponse>(corr =>
347+
new RouteQueryRequest(corr, superStream, routingKey)).ConfigureAwait(false);
348+
}
349+
344350
private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSpan? timeout = null)
345351
where TIn : struct, ICommand where TOut : struct, ICommand
346352
{
@@ -526,7 +532,10 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
526532
PartitionsQueryResponse.Read(frame, out var partitionsQueryResponse);
527533
HandleCorrelatedResponse(partitionsQueryResponse);
528534
break;
529-
535+
case RouteQueryResponse.Key:
536+
RouteQueryResponse.Read(frame, out var routeQueryResponse);
537+
HandleCorrelatedResponse(routeQueryResponse);
538+
break;
530539
default:
531540
if (MemoryMarshal.TryGetArray(frame.First, out var segment))
532541
{

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,15 @@ public OffsetNotFoundException(string s)
8282
{
8383
}
8484
}
85+
86+
// RouteNotFoundException the exception for super stream publish
87+
// RouteNotFoundException is raised when the message can't be routed to any stream.
88+
// In this case the user will receive a timeout error and this exception is raised
89+
public class RouteNotFoundException : ProtocolException
90+
{
91+
public RouteNotFoundException(string s)
92+
: base(s)
93+
{
94+
}
95+
}
8596
}

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ RabbitMQ.Stream.Client.Hash.Murmur3.Murmur3(uint seed) -> void
317317
RabbitMQ.Stream.Client.Hash.Murmur3.Seed.get -> uint
318318
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy
319319
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.HashRoutingMurmurStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor) -> void
320-
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Collections.Generic.List<string>
321320
RabbitMQ.Stream.Client.HeartBeatHandler
322321
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
323322
RabbitMQ.Stream.Client.IClient
@@ -390,7 +389,6 @@ RabbitMQ.Stream.Client.IRouting.ValidateDns.get -> bool
390389
RabbitMQ.Stream.Client.IRouting.ValidateDns.set -> void
391390
RabbitMQ.Stream.Client.IRoutingConfiguration
392391
RabbitMQ.Stream.Client.IRoutingStrategy
393-
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Collections.Generic.List<string>
394392
RabbitMQ.Stream.Client.LeaderLocator
395393
RabbitMQ.Stream.Client.LeaderLocator.LeaderLocator() -> void
396394
RabbitMQ.Stream.Client.LeaderNotFoundException

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
12
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
23
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
34
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
45
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
56
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
7+
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
68
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
9+
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
710
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
811
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
12+
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
13+
RabbitMQ.Stream.Client.KeyRoutingStrategy
14+
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
15+
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
16+
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
17+
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
918
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
1019
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
1120
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
@@ -16,6 +25,21 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, Rab
1625
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
1726
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
1827
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
28+
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
29+
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
30+
RabbitMQ.Stream.Client.RouteNotFoundException
31+
RabbitMQ.Stream.Client.RouteNotFoundException.RouteNotFoundException(string s) -> void
32+
RabbitMQ.Stream.Client.RouteQueryResponse
33+
RabbitMQ.Stream.Client.RouteQueryResponse.CorrelationId.get -> uint
34+
RabbitMQ.Stream.Client.RouteQueryResponse.ResponseCode.get -> RabbitMQ.Stream.Client.ResponseCode
35+
RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse() -> void
36+
RabbitMQ.Stream.Client.RouteQueryResponse.RouteQueryResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.List<string> streams) -> void
37+
RabbitMQ.Stream.Client.RouteQueryResponse.SizeNeeded.get -> int
38+
RabbitMQ.Stream.Client.RouteQueryResponse.Streams.get -> System.Collections.Generic.List<string>
39+
RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span<byte> span) -> int
40+
RabbitMQ.Stream.Client.RoutingStrategyType
41+
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
42+
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
1943
RabbitMQ.Stream.Client.StreamStats
2044
RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong
2145
RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ private RawSuperStreamProducer(
6363
_config = config;
6464
_streamInfos = streamInfos;
6565
_clientParameters = clientParameters;
66-
_defaultRoutingConfiguration.RoutingStrategy = new HashRoutingMurmurStrategy(_config.Routing);
66+
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
67+
{
68+
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
69+
_config.Client.QueryRoute, _config.SuperStream),
70+
RoutingStrategyType.Hash => new HashRoutingMurmurStrategy(_config.Routing),
71+
_ => new HashRoutingMurmurStrategy(_config.Routing)
72+
};
6773
_logger = logger ?? NullLogger<RawSuperStreamProducer>.Instance;
6874
}
6975

@@ -122,7 +128,8 @@ private RawProducerConfig FromStreamConfig(string stream)
122128
// The producer is created on demand when a message is sent to a stream
123129
private async Task<IProducer> InitProducer(string stream)
124130
{
125-
var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false);
131+
var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger)
132+
.ConfigureAwait(false);
126133
_logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference,
127134
stream);
128135
return p;
@@ -147,8 +154,16 @@ private async Task<IProducer> GetProducerForMessage(Message message)
147154
throw new ObjectDisposedException(nameof(RawSuperStreamProducer));
148155
}
149156

150-
var routes = _defaultRoutingConfiguration.RoutingStrategy.Route(message,
151-
_streamInfos.Keys.ToList());
157+
var routes = await _defaultRoutingConfiguration.RoutingStrategy.Route(message,
158+
_streamInfos.Keys.ToList()).ConfigureAwait(false);
159+
160+
// we should always have a route
161+
// but in case of stream KEY the routing could not exist
162+
if (routes is not { Count: > 0 })
163+
{
164+
throw new RouteNotFoundException("No route found for the message to any stream");
165+
}
166+
152167
return await GetProducer(routes[0]).ConfigureAwait(false);
153168
}
154169

@@ -263,6 +278,12 @@ public void Dispose()
263278
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
264279
}
265280

281+
public enum RoutingStrategyType
282+
{
283+
Hash,
284+
Key,
285+
}
286+
266287
public record RawSuperStreamProducerConfig : IProducerConfig
267288
{
268289
public RawSuperStreamProducerConfig(string superStream)
@@ -285,6 +306,8 @@ public RawSuperStreamProducerConfig(string superStream)
285306
public Func<Message, string> Routing { get; set; } = null;
286307
public Action<(string, Confirmation)> ConfirmHandler { get; set; } = _ => { };
287308

309+
public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash;
310+
288311
internal Client Client { get; set; }
289312
}
290313

@@ -306,7 +329,7 @@ internal class DefaultRoutingConfiguration : IRoutingConfiguration
306329
/// </summary>
307330
public interface IRoutingStrategy
308331
{
309-
List<string> Route(Message message, List<string> partitions);
332+
Task<List<string>> Route(Message message, List<string> partitions);
310333
}
311334

312335
/// <summary>
@@ -322,16 +345,53 @@ public class HashRoutingMurmurStrategy : IRoutingStrategy
322345
private const int Seed = 104729; // must be the same to all the clients to be compatible
323346

324347
// Routing based on the Murmur hash function
325-
public List<string> Route(Message message, List<string> partitions)
348+
public Task<List<string>> Route(Message message, List<string> partitions)
326349
{
327350
var key = _routingKeyExtractor(message);
328351
var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key));
329352
var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count;
330-
return new List<string>() { partitions[(int)index] };
353+
var r = new List<string>() { partitions[(int)index] };
354+
return Task.FromResult(r);
331355
}
332356

333357
public HashRoutingMurmurStrategy(Func<Message, string> routingKeyExtractor)
334358
{
335359
_routingKeyExtractor = routingKeyExtractor;
336360
}
337361
}
362+
363+
/// <summary>
364+
/// KeyRoutingStrategy is a routing strategy that uses the routing key to route messages to streams.
365+
/// </summary>
366+
public class KeyRoutingStrategy : IRoutingStrategy
367+
{
368+
private readonly Func<Message, string> _routingKeyExtractor;
369+
private readonly Func<string, string, Task<RouteQueryResponse>> _routingKeyQFunc;
370+
private readonly string _superStream;
371+
private readonly Dictionary<string, List<string>> _cacheStream = new();
372+
373+
public async Task<List<string>> Route(Message message, List<string> partitions)
374+
{
375+
var key = _routingKeyExtractor(message);
376+
// If the stream is already in the cache we return it
377+
// to avoid a query to the server for each send
378+
if (_cacheStream.TryGetValue(key, out var value))
379+
{
380+
return value;
381+
}
382+
383+
var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false);
384+
_cacheStream[key] = c.Streams;
385+
return (from resultStream in c.Streams
386+
where partitions.Contains(resultStream)
387+
select new List<string>() { resultStream }).FirstOrDefault();
388+
}
389+
390+
public KeyRoutingStrategy(Func<Message, string> routingKeyExtractor,
391+
Func<string, string, Task<RouteQueryResponse>> routingKeyQFunc, string superStream)
392+
{
393+
_routingKeyExtractor = routingKeyExtractor;
394+
_routingKeyQFunc = routingKeyQFunc;
395+
_superStream = superStream;
396+
}
397+
}

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public record SuperStreamConfig
1616
{
1717
public bool Enabled { get; init; } = true;
1818
public Func<Message, string> Routing { get; set; }
19+
20+
public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash;
1921
}
2022

2123
[AttributeUsage(AttributeTargets.Method)]
@@ -257,6 +259,12 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
257259
}
258260
}
259261

262+
// see the RouteNotFoundException comment
263+
catch (RouteNotFoundException)
264+
{
265+
throw;
266+
}
267+
260268
catch (Exception e)
261269
{
262270
_logger?.LogError(e, "Error sending message. " +
@@ -291,6 +299,12 @@ public async ValueTask Send(List<Message> messages, CompressionType compressionT
291299
}
292300
}
293301

302+
// see the RouteNotFoundException comment
303+
catch (RouteNotFoundException)
304+
{
305+
throw;
306+
}
307+
294308
catch (Exception e)
295309
{
296310
_logger?.LogError(e, "Error sending sub-batch messages. " +
@@ -346,6 +360,12 @@ public async ValueTask Send(List<Message> messages)
346360
}
347361
}
348362

363+
// see the RouteNotFoundException comment
364+
catch (RouteNotFoundException)
365+
{
366+
throw;
367+
}
368+
349369
catch (Exception e)
350370
{
351371
_logger?.LogError(e, "Error sending messages. " +

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private async Task<IProducer> SuperStreamProducer()
3939
MessagesBufferSize = _producerConfig.MessagesBufferSize,
4040
MaxInFlight = _producerConfig.MaxInFlight,
4141
Routing = _producerConfig.SuperStreamConfig.Routing,
42+
RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType,
4243
ConfirmHandler = confirmationHandler =>
4344
{
4445
var (stream, confirmation) = confirmationHandler;
@@ -77,7 +78,10 @@ private async Task<IProducer> StandardProducer()
7778
_producerConfig.StreamSystem).WaitAsync(CancellationToken.None);
7879
});
7980
},
80-
ConnectionClosedHandler = async _ => { await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false); },
81+
ConnectionClosedHandler = async _ =>
82+
{
83+
await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false);
84+
},
8185
ConfirmHandler = confirmation =>
8286
{
8387
var confirmationStatus = confirmation.Code switch
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
7+
namespace RabbitMQ.Stream.Client;
8+
9+
internal readonly struct RouteQueryRequest : ICommand
10+
{
11+
public const ushort Key = 0x0018;
12+
private readonly string _superStream;
13+
private readonly string _routingKey;
14+
private readonly uint _corrId;
15+
16+
public RouteQueryRequest(uint corrId, string superStream, string routingKey)
17+
{
18+
_corrId = corrId;
19+
_superStream = superStream;
20+
_routingKey = routingKey;
21+
}
22+
23+
public int SizeNeeded => 2 + 2 + 4 +
24+
WireFormatting.StringSize(_superStream) +
25+
WireFormatting.StringSize(_routingKey);
26+
27+
public int Write(Span<byte> span)
28+
{
29+
var command = (ICommand)this;
30+
var offset = WireFormatting.WriteUInt16(span, Key);
31+
offset += WireFormatting.WriteUInt16(span[offset..], command.Version);
32+
offset += WireFormatting.WriteUInt32(span[offset..], _corrId);
33+
offset += WireFormatting.WriteString(span[offset..], _routingKey);
34+
offset += WireFormatting.WriteString(span[offset..], _superStream);
35+
return offset;
36+
}
37+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.Collections.Generic;
8+
9+
namespace RabbitMQ.Stream.Client;
10+
11+
public struct RouteQueryResponse : ICommand
12+
{
13+
public const ushort Key = 0x0018;
14+
15+
public RouteQueryResponse(uint correlationId, ResponseCode responseCode, List<string> streams)
16+
{
17+
Streams = streams;
18+
ResponseCode = responseCode;
19+
CorrelationId = correlationId;
20+
}
21+
22+
public List<string> Streams { get; }
23+
public int SizeNeeded => throw new NotImplementedException();
24+
public int Write(Span<byte> span) => throw new NotImplementedException();
25+
26+
public uint CorrelationId { get; }
27+
public ResponseCode ResponseCode { get; }
28+
29+
internal static int Read(ReadOnlySequence<byte> frame, out RouteQueryResponse command)
30+
{
31+
var offset = WireFormatting.ReadUInt16(frame, out _);
32+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _);
33+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var correlationId);
34+
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode);
35+
offset += WireFormatting.ReadUInt32(frame.Slice(offset), out var numOfStreams);
36+
37+
var streams = new List<string>();
38+
for (var i = 0; i < numOfStreams; i++)
39+
{
40+
offset += WireFormatting.ReadString(frame.Slice(offset), out var stream);
41+
streams.Add(stream);
42+
}
43+
44+
command = new RouteQueryResponse(correlationId, (ResponseCode)responseCode, streams);
45+
return offset;
46+
}
47+
}

0 commit comments

Comments
 (0)