Skip to content

Commit 3516e97

Browse files
authored
Implement the super stream consumer Super Stream Consumer (#174)
Add IConsumer interface to be compliant to the Iproducer The base for Standard Consumer and Super-Stream Consumer Implement super-consumer low-level client Implement the super stream for Reliable part Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 55f78d8 commit 3516e97

15 files changed

+1076
-196
lines changed

Docker/Dockerfile

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ RUN wget https://packages.microsoft.com/config/ubuntu/21.04/packages-microsoft-p
99
RUN dpkg -i packages-microsoft-prod.deb
1010
RUN rm packages-microsoft-prod.deb
1111

12-
RUN apt-get update && \
13-
apt-get install -y apt-transport-https && \
14-
apt-get update && \
15-
apt-get install -y git && \
16-
apt-get install -y dotnet-sdk-6.0
12+
13+
RUN printf 'Package: *net*\nPin: origin packages.microsoft.com\nPin-Priority: 1001' >> /etc/apt/preferences
14+
15+
RUN apt-get update
16+
RUN apt install dotnet6 -y
17+
RUN apt install git -y
18+
1719

1820
RUN apt-get install make -y

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ run-test-in-docker:
2222
docker stop dotnet-test || true
2323
docker build -t stream-dotnet-test -f Docker/Dockerfile . && \
2424
docker run -d --rm --name dotnet-test -v $(shell pwd):/source --cpuset-cpus="1" stream-dotnet-test && \
25-
docker exec -it dotnet-test /bin/sh -c "cd /source && make" || true
25+
docker exec -it dotnet-test /bin/sh -c "cd /source && make test" || true

RabbitMQ.Stream.Client/Consumer.cs

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,8 @@ public ConsumerEvents(Func<Deliver, Task> deliverHandler,
3434
public Func<bool, Task<IOffsetType>> ConsumerUpdateHandler { get; }
3535
}
3636

37-
public record ConsumerConfig : INamedEntity
37+
public record ConsumerConfig : IConsumerConfig
3838
{
39-
// StoredOffsetSpec configuration it is needed to keep the offset spec.
40-
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
41-
// and from ConsumerConfig.ConsumerUpdateListener.
42-
// needed also See also consumer:MaybeDispatch/1.
43-
// It is not public because it is not needed for the user.
44-
internal IOffsetType StoredOffsetSpec { get; set; }
45-
4639
internal void Validate()
4740
{
4841
if (IsSingleActiveConsumer && (Reference == null || Reference.Trim() == string.Empty))
@@ -51,34 +44,17 @@ internal void Validate()
5144
}
5245
}
5346

47+
public IOffsetType OffsetSpec { get; set; } = new OffsetTypeNext();
48+
5449
// stream name where the consumer will consume the messages.
5550
// stream must exist before the consumer is created.
5651
public string Stream { get; set; }
57-
public string Reference { get; set; }
5852
public Func<Consumer, MessageContext, Message, Task> MessageHandler { get; set; }
59-
public Func<string, Task> ConnectionClosedHandler { get; set; }
60-
61-
public IOffsetType OffsetSpec { get; set; } = new OffsetTypeNext();
62-
63-
// ClientProvidedName is used to identify TCP connection name.
64-
public string ClientProvidedName { get; set; } = "dotnet-stream-consumer";
6553

6654
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
67-
68-
// SingleActiveConsumer is used to indicate that there is only one consumer active for the stream.
69-
// given a consumer reference.
70-
// Consumer Reference can't be null or Empty.
71-
72-
public bool IsSingleActiveConsumer { get; set; } = false;
73-
74-
// config.ConsumerUpdateListener is the callback for when the consumer is updated due
75-
// to single active consumer.
76-
// return IOffsetType to indicate the offset to be used for the next consumption.
77-
// if the ConsumerUpdateListener==null the OffsetSpec will be used.
78-
public Func<string, string, bool, Task<IOffsetType>> ConsumerUpdateListener { get; set; } = null;
7955
}
8056

81-
public class Consumer : AbstractEntity, IDisposable
57+
public class Consumer : AbstractEntity, IConsumer, IDisposable
8258
{
8359
private bool _disposed;
8460
private readonly ConsumerConfig config;
@@ -109,7 +85,7 @@ public async Task StoreOffset(ulong offset)
10985
await client.StoreOffset(config.Reference, config.Stream, offset);
11086
}
11187

112-
public static async Task<Consumer> Create(ClientParameters clientParameters,
88+
public static async Task<IConsumer> Create(ClientParameters clientParameters,
11389
ConsumerConfig config,
11490
StreamInfo metaStreamInfo)
11591
{
@@ -159,11 +135,18 @@ private async Task Init()
159135
continue;
160136
}
161137

162-
var message = Message.From(messageEntry.Data);
163-
await config.MessageHandler(this,
164-
new MessageContext(messageEntry.Offset,
165-
TimeSpan.FromMilliseconds(deliver.Chunk.Timestamp)),
166-
message);
138+
try
139+
{
140+
var message = Message.From(messageEntry.Data);
141+
await config.MessageHandler(this,
142+
new MessageContext(messageEntry.Offset,
143+
TimeSpan.FromMilliseconds(deliver.Chunk.Timestamp)),
144+
message);
145+
}
146+
catch (Exception e)
147+
{
148+
LogEventSource.Log.LogError($"Error while processing message {messageEntry.Offset} {e}");
149+
}
167150
}
168151

169152
// give one credit after each chunk
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2020 VMware, Inc.
4+
5+
using System;
6+
using System.Threading.Tasks;
7+
8+
namespace RabbitMQ.Stream.Client;
9+
10+
public interface IConsumer
11+
{
12+
public Task StoreOffset(ulong offset);
13+
public Task<ResponseCode> Close();
14+
public void Dispose();
15+
}
16+
17+
public record IConsumerConfig : INamedEntity
18+
{
19+
// StoredOffsetSpec configuration it is needed to keep the offset spec.
20+
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
21+
// and from ConsumerConfig.ConsumerUpdateListener.
22+
// needed also See also consumer:MaybeDispatch/1.
23+
// It is not public because it is not needed for the user.
24+
internal IOffsetType StoredOffsetSpec { get; set; }
25+
26+
// ClientProvidedName is used to identify TCP connection name.
27+
public string ClientProvidedName { get; set; } = "dotnet-stream-consumer";
28+
29+
// SingleActiveConsumer is used to indicate that there is only one consumer active for the stream.
30+
// given a consumer reference.
31+
// Consumer Reference can't be null or Empty.
32+
public bool IsSingleActiveConsumer { get; set; } = false;
33+
34+
// config.ConsumerUpdateListener is the callback for when the consumer is updated due
35+
// to single active consumer.
36+
// return IOffsetType to indicate the offset to be used for the next consumption.
37+
// if the ConsumerUpdateListener==null the OffsetSpec will be used.
38+
public Func<string, string, bool, Task<IOffsetType>> ConsumerUpdateListener { get; set; } = null;
39+
40+
public string Reference { get; set; }
41+
42+
public Func<string, Task> ConnectionClosedHandler { get; set; }
43+
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -278,22 +278,12 @@ RabbitMQ.Stream.Client.Consumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.
278278
RabbitMQ.Stream.Client.Consumer.Dispose() -> void
279279
RabbitMQ.Stream.Client.Consumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
280280
RabbitMQ.Stream.Client.ConsumerConfig
281-
RabbitMQ.Stream.Client.ConsumerConfig.ClientProvidedName.get -> string
282-
RabbitMQ.Stream.Client.ConsumerConfig.ClientProvidedName.set -> void
283-
RabbitMQ.Stream.Client.ConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
284-
RabbitMQ.Stream.Client.ConsumerConfig.ConnectionClosedHandler.set -> void
285-
RabbitMQ.Stream.Client.ConsumerConfig.ConsumerUpdateListener.get -> System.Func<string, string, bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>>
286-
RabbitMQ.Stream.Client.ConsumerConfig.ConsumerUpdateListener.set -> void
287-
RabbitMQ.Stream.Client.ConsumerConfig.IsSingleActiveConsumer.get -> bool
288-
RabbitMQ.Stream.Client.ConsumerConfig.IsSingleActiveConsumer.set -> void
289281
RabbitMQ.Stream.Client.ConsumerConfig.MessageHandler.get -> System.Func<RabbitMQ.Stream.Client.Consumer, RabbitMQ.Stream.Client.MessageContext, RabbitMQ.Stream.Client.Message, System.Threading.Tasks.Task>
290282
RabbitMQ.Stream.Client.ConsumerConfig.MessageHandler.set -> void
291283
RabbitMQ.Stream.Client.ConsumerConfig.MetadataHandler.get -> System.Action<RabbitMQ.Stream.Client.MetaDataUpdate>
292284
RabbitMQ.Stream.Client.ConsumerConfig.MetadataHandler.set -> void
293285
RabbitMQ.Stream.Client.ConsumerConfig.OffsetSpec.get -> RabbitMQ.Stream.Client.IOffsetType
294286
RabbitMQ.Stream.Client.ConsumerConfig.OffsetSpec.set -> void
295-
RabbitMQ.Stream.Client.ConsumerConfig.Reference.get -> string
296-
RabbitMQ.Stream.Client.ConsumerConfig.Reference.set -> void
297287
RabbitMQ.Stream.Client.ConsumerConfig.Stream.get -> string
298288
RabbitMQ.Stream.Client.ConsumerConfig.Stream.set -> void
299289
RabbitMQ.Stream.Client.ConsumerUpdateQueryResponse
@@ -426,6 +416,21 @@ RabbitMQ.Stream.Client.ICompressionCodec.MessagesCount.get -> int
426416
RabbitMQ.Stream.Client.ICompressionCodec.UnCompress(System.Buffers.ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize) -> System.Buffers.ReadOnlySequence<byte>
427417
RabbitMQ.Stream.Client.ICompressionCodec.UnCompressedSize.get -> int
428418
RabbitMQ.Stream.Client.ICompressionCodec.Write(System.Span<byte> span) -> int
419+
RabbitMQ.Stream.Client.IConsumer
420+
RabbitMQ.Stream.Client.IConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
421+
RabbitMQ.Stream.Client.IConsumer.Dispose() -> void
422+
RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
423+
RabbitMQ.Stream.Client.IConsumerConfig
424+
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.get -> string
425+
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.set -> void
426+
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
427+
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.set -> void
428+
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.get -> System.Func<string, string, bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>>
429+
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.set -> void
430+
RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.get -> bool
431+
RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.set -> void
432+
RabbitMQ.Stream.Client.IConsumerConfig.Reference.get -> string
433+
RabbitMQ.Stream.Client.IConsumerConfig.Reference.set -> void
429434
RabbitMQ.Stream.Client.INamedEntity
430435
RabbitMQ.Stream.Client.INamedEntity.ClientProvidedName.get -> string
431436
RabbitMQ.Stream.Client.INamedEntity.ClientProvidedName.set -> void
@@ -703,6 +708,10 @@ RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.PublisherDoesNotExist = 18 ->
703708
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
704709
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
705710
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
711+
RabbitMQ.Stream.Client.Reliable.ConsumerFactory
712+
RabbitMQ.Stream.Client.Reliable.ConsumerFactory.ConsumerFactory() -> void
713+
RabbitMQ.Stream.Client.Reliable.ConsumerFactory.CreateConsumer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
714+
RabbitMQ.Stream.Client.Reliable.ConsumerFactory._reliableConsumerConfig -> RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig
706715
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
707716
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> System.Threading.Tasks.ValueTask
708717
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> System.Threading.Tasks.ValueTask<bool>
@@ -740,7 +749,9 @@ RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ConsumerUpdateListener.ge
740749
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.ConsumerUpdateListener.set -> void
741750
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.IsSingleActiveConsumer.get -> bool
742751
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.IsSingleActiveConsumer.set -> void
743-
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.get -> System.Func<RabbitMQ.Stream.Client.Consumer, RabbitMQ.Stream.Client.MessageContext, RabbitMQ.Stream.Client.Message, System.Threading.Tasks.Task>
752+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.IsSuperStream.get -> bool
753+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.IsSuperStream.set -> void
754+
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.get -> System.Func<string, RabbitMQ.Stream.Client.Consumer, RabbitMQ.Stream.Client.MessageContext, RabbitMQ.Stream.Client.Message, System.Threading.Tasks.Task>
744755
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.MessageHandler.set -> void
745756
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.get -> RabbitMQ.Stream.Client.IOffsetType
746757
RabbitMQ.Stream.Client.Reliable.ReliableConsumerConfig.OffsetSpec.set -> void
@@ -872,13 +883,15 @@ RabbitMQ.Stream.Client.StreamSpec.Name.init -> void
872883
RabbitMQ.Stream.Client.StreamSpec.StreamSpec(string Name) -> void
873884
RabbitMQ.Stream.Client.StreamSystem
874885
RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task
875-
RabbitMQ.Stream.Client.StreamSystem.CreateConsumer(RabbitMQ.Stream.Client.ConsumerConfig consumerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Consumer>
886+
RabbitMQ.Stream.Client.StreamSystem.CreateConsumer(RabbitMQ.Stream.Client.ConsumerConfig consumerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
876887
RabbitMQ.Stream.Client.StreamSystem.CreateProducer(RabbitMQ.Stream.Client.ProducerConfig producerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
877888
RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task
889+
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.SuperStreamConsumerConfig superStreamConsumerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
878890
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamProducer(RabbitMQ.Stream.Client.SuperStreamProducerConfig superStreamProducerConfig) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
879891
RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task
880892
RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool
881893
RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
894+
RabbitMQ.Stream.Client.StreamSystem.QueryPartition(string superStream) -> System.Threading.Tasks.Task<string[]>
882895
RabbitMQ.Stream.Client.StreamSystem.QuerySequence(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
883896
RabbitMQ.Stream.Client.StreamSystem.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
884897
RabbitMQ.Stream.Client.StreamSystemConfig
@@ -916,6 +929,17 @@ RabbitMQ.Stream.Client.SubscribeResponse.ResponseCode.get -> RabbitMQ.Stream.Cli
916929
RabbitMQ.Stream.Client.SubscribeResponse.SizeNeeded.get -> int
917930
RabbitMQ.Stream.Client.SubscribeResponse.SubscribeResponse() -> void
918931
RabbitMQ.Stream.Client.SubscribeResponse.Write(System.Span<byte> span) -> int
932+
RabbitMQ.Stream.Client.SuperStreamConsumer
933+
RabbitMQ.Stream.Client.SuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
934+
RabbitMQ.Stream.Client.SuperStreamConsumer.Dispose() -> void
935+
RabbitMQ.Stream.Client.SuperStreamConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
936+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig
937+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.MessageHandler.get -> System.Func<string, RabbitMQ.Stream.Client.Consumer, RabbitMQ.Stream.Client.MessageContext, RabbitMQ.Stream.Client.Message, System.Threading.Tasks.Task>
938+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.MessageHandler.set -> void
939+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.OffsetSpec.get -> System.Collections.Concurrent.ConcurrentDictionary<string, RabbitMQ.Stream.Client.IOffsetType>
940+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.OffsetSpec.set -> void
941+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.SuperStream.get -> string
942+
RabbitMQ.Stream.Client.SuperStreamConsumerConfig.SuperStream.set -> void
919943
RabbitMQ.Stream.Client.SuperStreamProducer
920944
RabbitMQ.Stream.Client.SuperStreamProducer.BatchSend(System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages) -> System.Threading.Tasks.ValueTask
921945
RabbitMQ.Stream.Client.SuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
@@ -978,7 +1002,7 @@ static RabbitMQ.Stream.Client.Client.Create(RabbitMQ.Stream.Client.ClientParamet
9781002
static RabbitMQ.Stream.Client.CompressionHelper.Compress(System.Collections.Generic.List<RabbitMQ.Stream.Client.Message> messages, RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec
9791003
static RabbitMQ.Stream.Client.CompressionHelper.UnCompress(RabbitMQ.Stream.Client.CompressionType compressionType, System.Buffers.ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize) -> System.Buffers.ReadOnlySequence<byte>
9801004
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
981-
static RabbitMQ.Stream.Client.Consumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.ConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Consumer>
1005+
static RabbitMQ.Stream.Client.Consumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.ConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
9821006
static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.Client.LeaderLocator
9831007
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
9841008
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator
@@ -996,5 +1020,6 @@ static RabbitMQ.Stream.Client.StreamCompressionCodecs.RegisterCodec<T>(RabbitMQ.
9961020
static RabbitMQ.Stream.Client.StreamCompressionCodecs.UnRegisterCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> void
9971021
static RabbitMQ.Stream.Client.StreamSystem.Create(RabbitMQ.Stream.Client.StreamSystemConfig config) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamSystem>
9981022
static RabbitMQ.Stream.Client.SubEntryPublish.Version.get -> byte
1023+
static RabbitMQ.Stream.Client.SuperStreamConsumer.Create(RabbitMQ.Stream.Client.SuperStreamConsumerConfig superStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters) -> RabbitMQ.Stream.Client.IConsumer
9991024
static RabbitMQ.Stream.Client.SuperStreamProducer.Create(RabbitMQ.Stream.Client.SuperStreamProducerConfig superStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters) -> RabbitMQ.Stream.Client.IProducer
10001025
static RabbitMQ.Stream.Client.Version.VersionString.get -> string

0 commit comments

Comments
 (0)