Skip to content

Commit 66cbd13

Browse files
authored
Add Chunk info consumer side (#355)
* Add chunk info on the consumer side * Closes #331 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent dd8a4ce commit 66cbd13

File tree

5 files changed

+32
-9
lines changed

5 files changed

+32
-9
lines changed

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ RabbitMQ.Stream.Client.Message.Size.get -> int
395395
RabbitMQ.Stream.Client.Message.Write(System.Span<byte> span) -> int
396396
RabbitMQ.Stream.Client.MessageContext
397397
RabbitMQ.Stream.Client.MessageContext.MessageContext() -> void
398-
RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp) -> void
399398
RabbitMQ.Stream.Client.MessageContext.Offset.get -> ulong
400399
RabbitMQ.Stream.Client.MessageContext.Timestamp.get -> System.TimeSpan
401400
RabbitMQ.Stream.Client.MetaDataQuery

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.C
155155
RabbitMQ.Stream.Client.KeyRoutingStrategy
156156
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
157157
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
158+
RabbitMQ.Stream.Client.MessageContext.ChunkId.get -> ulong
159+
RabbitMQ.Stream.Client.MessageContext.ChunkMessagesCount.get -> uint
160+
RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp, uint chunkMessagesCount, ulong chunkId) -> void
158161
RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTime dateTime) -> void
159162
RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOffset dateTimeOffset) -> void
160163
RabbitMQ.Stream.Client.ProducerFilter

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,22 @@ public struct MessageContext
3434
/// </summary>
3535
public TimeSpan Timestamp { get; }
3636

37-
public MessageContext(ulong offset, TimeSpan timestamp)
37+
/// <summary>
38+
/// The number of messages in the current chunk
39+
/// </summary>
40+
public uint ChunkMessagesCount { get; }
41+
42+
/// <summary>
43+
/// It is the chunk id that can help to understand the ChuckMessagesCount
44+
/// </summary>
45+
public ulong ChunkId { get; }
46+
47+
public MessageContext(ulong offset, TimeSpan timestamp, uint chunkMessagesCount, ulong chunkId)
3848
{
3949
Offset = offset;
4050
Timestamp = timestamp;
51+
ChunkMessagesCount = chunkMessagesCount;
52+
ChunkId = chunkId;
4153
}
4254
}
4355

@@ -306,7 +318,8 @@ async Task DispatchMessage(Message message, ulong i)
306318
{
307319
await _config.MessageHandler(this,
308320
new MessageContext(message.MessageOffset,
309-
TimeSpan.FromMilliseconds(chunk.Timestamp)),
321+
TimeSpan.FromMilliseconds(chunk.Timestamp),
322+
chunk.NumRecords, chunk.ChunkId),
310323
message).ConfigureAwait(false);
311324
}
312325
}

Tests/RawConsumerSystemTests.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using System.Buffers;
77
using System.Collections.Generic;
8+
using System.Linq;
89
using System.Net;
910
using System.Text;
1011
using System.Threading;
@@ -38,13 +39,15 @@ public async void CreateRawConsumer()
3839
var rawProducer = await system.CreateRawProducer(
3940
new RawProducerConfig(stream) { Reference = "producer" });
4041
var identifierReceived = "";
42+
uint messagesInTheChunk = 0;
4143
var consumer = await system.CreateRawConsumer(
4244
new RawConsumerConfig(stream)
4345
{
4446
Reference = "consumer",
4547
Identifier = "consumer_identifier_999",
4648
MessageHandler = async (sourceConsumer, ctx, message) =>
4749
{
50+
messagesInTheChunk = ctx.ChunkMessagesCount;
4851
identifierReceived = sourceConsumer.Info.Identifier;
4952
testPassed.SetResult(message.Data);
5053
await Task.CompletedTask;
@@ -59,6 +62,7 @@ public async void CreateRawConsumer()
5962

6063
Assert.Equal(msgData.Contents.ToArray(), testPassed.Task.Result.Contents.ToArray());
6164
Assert.Equal("consumer_identifier_999", identifierReceived);
65+
Assert.Equal((uint)1, messagesInTheChunk);
6266
rawProducer.Dispose();
6367
consumer.Dispose();
6468
await system.DeleteStream(stream);
@@ -250,12 +254,15 @@ void AssertMessages(IReadOnlyList<Message> expected, IReadOnlyList<Message> actu
250254
await system.CreateStream(new StreamSpec(stream));
251255

252256
var receivedMessages = new List<Message>();
257+
var messagesInTheChunks = new Dictionary<ulong, uint>();
258+
253259
var consumer = await system.CreateRawConsumer(
254260
new RawConsumerConfig(stream)
255261
{
256262
Reference = "consumer",
257263
MessageHandler = async (consumer, ctx, message) =>
258264
{
265+
messagesInTheChunks[ctx.ChunkId] = ctx.ChunkMessagesCount;
259266
receivedMessages.Add(message);
260267
if (receivedMessages.Count == 10)
261268
{
@@ -284,7 +291,8 @@ void AssertMessages(IReadOnlyList<Message> expected, IReadOnlyList<Message> actu
284291

285292
AssertMessages(messagesGzip, testPassed.Task.Result.FindAll(s =>
286293
Encoding.Default.GetString(s.Data.Contents.ToArray()).Contains("Gzip_")));
287-
294+
Assert.Equal(10, messagesInTheChunks.Sum(x => x.Value));
295+
Assert.Equal((ulong)0, messagesInTheChunks.FirstOrDefault().Key);
288296
rawProducer.Dispose();
289297
consumer.Dispose();
290298
await system.DeleteStream(stream);

docs/ReliableClient/Program.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
ProducersPerConnection = 100,
1111
ConsumersPerConnection = 100,
1212
Host = "node0",
13-
Port = 5552,
14-
LoadBalancer = true,
13+
Port = 5562,
14+
LoadBalancer = false,
1515
SuperStream = false,
1616
Streams = 3,
1717
Producers = 10,
18-
MessagesPerProducer = 50_000_000,
18+
MessagesPerProducer = 10_000_000,
1919
Consumers = 10,
20-
Username = "test",
21-
Password = "test"
20+
// Username = "test",
21+
// Password = "test"
2222
});
2323

2424
await rClient.ConfigureAwait(false);

0 commit comments

Comments
 (0)