Skip to content

Commit f9442d6

Browse files
committed
* Fix the StreamFilter test.
1 parent 141c4b5 commit f9442d6

File tree

4 files changed

+17
-23
lines changed

4 files changed

+17
-23
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private enum PauseStatus
2828
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
2929
private readonly ConsumerConfiguration _configuration;
3030

31-
public AmqpConsumer(ConsumerConfiguration configuration)
31+
internal AmqpConsumer(ConsumerConfiguration configuration)
3232
{
3333
_configuration = configuration;
3434
_configuration.Connection.AddConsumer(_id, this);
@@ -108,6 +108,7 @@ await receiverLinkTask.WaitAsync(waitSpan)
108108
// TODO save / cancel task
109109
_ = Task.Run(ProcessMessages);
110110

111+
// TODO cancellation token
111112
await base.OpenAsync()
112113
.ConfigureAwait(false);
113114
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ namespace RabbitMQ.AMQP.Client.Impl
1414
/// <summary>
1515
/// ConsumerConfiguration is a helper class that holds the configuration for the consumer
1616
/// </summary>
17-
public class ConsumerConfiguration
17+
internal sealed class ConsumerConfiguration
1818
{
1919
public AmqpConnection Connection { get; set; } = null!;
2020
public string Address { get; set; } = "";
21-
public int InitialCredits { get; set; } = 10;
21+
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
2222
public Map Filters { get; set; } = new();
2323
public MessageHandler? Handler { get; set; }
2424
// TODO re-name to ListenerContextAction? Callback?

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Publishers.get -> System.Collections.Ge
287287
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
288288
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
289289
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
290-
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration! configuration) -> void
291290
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void
292291
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
293292
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
@@ -488,19 +487,6 @@ RabbitMQ.AMQP.Client.Impl.Consts
488487
RabbitMQ.AMQP.Client.Impl.Consts.Consts() -> void
489488
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
490489
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
491-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration
492-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Address.get -> string!
493-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Address.set -> void
494-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
495-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Connection.set -> void
496-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ConsumerConfiguration() -> void
497-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Filters.get -> Amqp.Types.Map!
498-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Filters.set -> void
499-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.get -> RabbitMQ.AMQP.Client.MessageHandler?
500-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.set -> void
501-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.get -> int
502-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.set -> void
503-
RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ListenerContext -> System.Action<RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext!>?
504490
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>
505491
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.Address() -> string!
506492
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder<T>.DefaultAddressBuilder() -> void

Tests/Consumer/StreamConsumerTests.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,19 +217,22 @@ public async Task StreamFiltering()
217217
IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.STREAM);
218218
await queueSpec.DeclareAsync();
219219

220-
var publishTasks = new List<Task>();
221220
foreach (string w in waves)
222221
{
223222
void ml(ulong idx, IMessage msg)
224223
{
225-
msg.MessageId(idx);
226-
msg.Annotation("x-stream-filter-value", w);
224+
msg.MessageId(idx)
225+
.Annotation("x-stream-filter-value", w);
227226
}
228-
publishTasks.Add(PublishAsync(queueSpec, messageWaveCount, ml));
229227

228+
/*
229+
* Note:
230+
* If publishing is done async, then messages must be filtered
231+
* as they are received because you could get "wrong" values
232+
* due to how the bloom filter works
233+
*/
234+
await PublishAsync(queueSpec, messageWaveCount, ml);
230235
}
231-
await WhenAllComplete(publishTasks);
232-
publishTasks.Clear();
233236

234237
long receivedCount = 0;
235238
Exception? messageHandlerEx = null;
@@ -238,6 +241,8 @@ async Task messageHandler(IContext cxt, IMessage msg)
238241
try
239242
{
240243
Interlocked.Increment(ref receivedCount);
244+
// _testOutputHelper.WriteLine($"id {0} x-stream-filter-value {1}",
245+
// msg.MessageId(), msg.Annotation("x-stream-filter-value"));
241246
await cxt.AcceptAsync();
242247
}
243248
catch (Exception ex)
@@ -260,6 +265,7 @@ async Task messageHandler(IContext cxt, IMessage msg)
260265
await WaitUntilStable(() => (int)Interlocked.Read(ref receivedCount));
261266
}
262267

268+
// _testOutputHelper.WriteLine("0: receivedCount: {0}", receivedCount);
263269
Assert.Null(messageHandlerEx);
264270
Assert.True(receivedCount >= messageWaveCount);
265271
Assert.True(receivedCount < allMessagesCount,
@@ -280,6 +286,7 @@ async Task messageHandler(IContext cxt, IMessage msg)
280286
await WaitUntilStable(() => (int)Interlocked.Read(ref receivedCount));
281287
}
282288

289+
// _testOutputHelper.WriteLine("1: receivedCount: {0}", receivedCount);
283290
Assert.Null(messageHandlerEx);
284291
Assert.True(receivedCount >= 2 * messageWaveCount);
285292
Assert.True(receivedCount < allMessagesCount,

0 commit comments

Comments
 (0)