Skip to content

Commit b91ab49

Browse files
Refactor Classes names (#177)
* Refactor Classes names Closes: #173 Closes: #171 Signed-off-by: Gabriele Santomaggio <[email protected]> Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Luke Bakken <[email protected]>
1 parent 3516e97 commit b91ab49

26 files changed

+601
-568
lines changed

.ci/versions.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"erlang": "25.0.4",
3-
"rabbitmq": "3.11.0"
2+
"erlang": "25.1.2",
3+
"rabbitmq": "3.11.2"
44
}

Examples/Performances/BatchVsBatchSend.cs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@ private static async Task RProducerSend(string stream, StreamSystem system)
3535
var total = 0;
3636
var confirmed = 0;
3737
var error = 0;
38-
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
38+
var reliableProducer = await Producer.Create(new ProducerConfig(system, stream)
3939
{
40-
Stream = stream,
41-
StreamSystem = system,
4240
MaxInFlight = 1_000_000,
4341
ConfirmationHandler = messagesConfirmed =>
4442
{
@@ -87,10 +85,8 @@ private static async Task RProducerBatchSend(string stream, StreamSystem system)
8785
var total = 0;
8886
var confirmed = 0;
8987
var error = 0;
90-
var reliableProducer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
88+
var producer = await Producer.Create(new ProducerConfig(system,stream)
9189
{
92-
Stream = stream,
93-
StreamSystem = system,
9490
MaxInFlight = 1_000_000,
9591
ConfirmationHandler = messagesConfirmed =>
9692
{
@@ -122,7 +118,7 @@ private static async Task RProducerBatchSend(string stream, StreamSystem system)
122118
messages.Add(new Message(array));
123119
if (i % AggregateBatchSize == 0)
124120
{
125-
await reliableProducer.BatchSend(messages);
121+
await producer.Send(messages);
126122
messages.Clear();
127123
}
128124

@@ -132,23 +128,22 @@ private static async Task RProducerBatchSend(string stream, StreamSystem system)
132128
}
133129
}
134130

135-
await reliableProducer.BatchSend(messages);
131+
await producer.Send(messages);
136132
messages.Clear();
137133

138134
Console.WriteLine(
139135
$"*****Reliable Producer Batch Send***** time: {DateTime.Now - start}, messages sent: {TotalMessages}");
140136
Thread.Sleep(1000);
141-
await reliableProducer.Close();
137+
await producer.Close();
142138
}
143139

144140

145141
private static async Task StandardProducerSend(string stream, StreamSystem system)
146142
{
147143
Console.WriteLine("*****Standard Producer Send*****");
148144
var confirmed = 0;
149-
var producer = await system.CreateProducer(new ProducerConfig()
145+
var producer = await system.CreateRawProducer(new RawProducerConfig(stream)
150146
{
151-
Stream = stream,
152147
MaxInFlight = 1_000_000,
153148
ConfirmHandler = _ =>
154149
{
@@ -182,9 +177,8 @@ private static async Task BatchSend(StreamSystem system, string stream)
182177
{
183178
Console.WriteLine("*****Standard Batch Send*****");
184179
var confirmed = 0;
185-
var producer = await system.CreateProducer(new ProducerConfig()
180+
var producer = await system.CreateRawProducer(new RawProducerConfig(stream)
186181
{
187-
Stream = stream,
188182
MaxInFlight = 1_000_000,
189183
ConfirmHandler = _ =>
190184
{
@@ -202,7 +196,7 @@ private static async Task BatchSend(StreamSystem system, string stream)
202196
messages.Add((i, new Message(array)));
203197
if (i % AggregateBatchSize == 0)
204198
{
205-
await producer.BatchSend(messages);
199+
await producer.Send(messages);
206200
messages.Clear();
207201
}
208202

@@ -212,7 +206,7 @@ private static async Task BatchSend(StreamSystem system, string stream)
212206
}
213207
}
214208

215-
await producer.BatchSend(messages);
209+
await producer.Send(messages);
216210
messages.Clear();
217211

218212
Console.WriteLine(

RabbitMQ.Stream.Client.PerfTest/Program.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ let main argv =
2020
let mutable confirmed = 0
2121
let mutable prod = null
2222
let streamName = "dotnet-perftest"
23-
let consumerConfig = ConsumerConfig(Stream = streamName,
23+
let consumerConfig = RawConsumerConfig(streamName,
2424
Reference = Guid.NewGuid().ToString(),
2525
MessageHandler =
2626
fun c ctx m ->
@@ -33,12 +33,12 @@ let main argv =
3333
let! system = StreamSystem.Create config
3434
let! stream = system.CreateStream(StreamSpec(streamName))
3535
printfn $"Stream: {streamName}"
36-
let! consumer = system.CreateConsumer(consumerConfig)
37-
let producerConfig = ProducerConfig(Stream = streamName,
36+
let! consumer = system.CreateRawConsumer(consumerConfig)
37+
let producerConfig = RawProducerConfig(streamName,
3838
Reference = null,
3939
MaxInFlight = 10000,
4040
ConfirmHandler = fun c -> confirmed <- confirmed + 1)
41-
let! producer = system.CreateProducer producerConfig
41+
let! producer = system.CreateRawProducer producerConfig
4242
//make producer available to metrics async
4343
prod <- producer
4444
let msg = Message "asdf"B

RabbitMQ.Stream.Client/Client.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,14 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
296296
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
297297
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
298298
{
299-
return await Subscribe(new ConsumerConfig() { Stream = stream, OffsetSpec = offsetType },
299+
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
300300
initialCredit,
301301
properties,
302302
deliverHandler,
303303
consumerUpdateHandler);
304304
}
305305

306-
public async Task<(byte, SubscribeResponse)> Subscribe(ConsumerConfig config,
306+
public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
307307
ushort initialCredit,
308308
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
309309
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
@@ -614,6 +614,7 @@ public async Task<bool> StreamExists(string stream)
614614
return response.StreamInfos is { Count: >= 1 } &&
615615
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
616616
}
617+
617618
public async ValueTask<QueryOffsetResponse> QueryOffset(string reference, string stream)
618619
{
619620
return await Request<QueryOffsetRequest, QueryOffsetResponse>(corr =>

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public record IConsumerConfig : INamedEntity
2424
internal IOffsetType StoredOffsetSpec { get; set; }
2525

2626
// ClientProvidedName is used to identify TCP connection name.
27-
public string ClientProvidedName { get; set; } = "dotnet-stream-consumer";
27+
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-consumer";
2828

2929
// SingleActiveConsumer is used to indicate that there is only one consumer active for the stream.
3030
// given a consumer reference.

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,44 @@ namespace RabbitMQ.Stream.Client;
1616

1717
public interface IProducer
1818
{
19+
/// <summary>
20+
/// Send the message to the stream in asynchronous mode.
21+
/// The client will aggregate messages and send them in batches.
22+
/// The batch size is configurable. See IProducerConfig.BatchSize.
23+
/// </summary>
24+
/// <param name="publishingId">Publishing id</param>
25+
/// <param name="message"> Message </param>
26+
/// <returns></returns>
1927
public ValueTask Send(ulong publishingId, Message message);
20-
public ValueTask BatchSend(List<(ulong, Message)> messages);
2128

29+
/// <summary>
30+
/// Send the messages in batch to the stream in synchronous mode.
31+
/// The aggregation is provided by the user.
32+
/// The client will send the messages in the order they are provided.
33+
/// </summary>
34+
/// <param name="messages">Batch messages to send</param>
35+
/// <returns></returns>
36+
public ValueTask Send(List<(ulong, Message)> messages);
37+
38+
/// <summary>
39+
/// Enable sub-batch feature.
40+
/// It is needed when you need to sub aggregate the messages and compress them.
41+
/// For example you can aggregate 100 log messages and compress to reduce the space.
42+
/// One single publishingId can have multiple sub-batches messages.
43+
/// See also: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#sub-entry-batching-and-compression
44+
/// </summary>
45+
/// <param name="publishingId"></param>
46+
/// <param name="subEntryMessages">Messages to aggregate</param>
47+
/// <param name="compressionType"> Type of compression. By default the client supports GZIP and none</param>
48+
/// <returns></returns>
2249
public ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType);
2350

2451
public Task<ResponseCode> Close();
2552

53+
/// <summary>
54+
/// Return the last publishing id.
55+
/// </summary>
56+
/// <returns></returns>
2657
public Task<ulong> GetLastPublishingId();
2758

2859
public bool IsOpen();
@@ -41,7 +72,7 @@ public record IProducerConfig : INamedEntity
4172
{
4273
public string Reference { get; set; }
4374
public int MaxInFlight { get; set; } = 1000;
44-
public string ClientProvidedName { get; set; } = "dotnet-stream-producer";
75+
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";
4576

4677
public int BatchSize { get; set; } = 100;
4778

0 commit comments

Comments
 (0)