Skip to content

Commit efacb49

Browse files
committed
fixup
1 parent 3df3972 commit efacb49

File tree

3 files changed

+20
-15
lines changed

3 files changed

+20
-15
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,9 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
139139

140140
public IConsumerBuilder.IStreamOptions FilterValue(string value)
141141
{
142+
var values = new List<string> { value };
142143
_filters[s_streamFilterSymbol] =
143-
new DescribedValue(s_streamFilterSymbol, value);
144+
new DescribedValue(s_streamFilterSymbol, values);
144145
return this;
145146
}
146147

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,14 @@ public IDictionary<object, object> Properties()
228228
return NativeMessage.ApplicationProperties.Map;
229229
}
230230

231+
#if TODO
232+
static void validateMessageAnnotationKey(String key) {
233+
if (!key.startsWith("x-")) {
234+
throw new IllegalArgumentException("Message annotation keys must start with 'x-': " + key);
235+
}
236+
}
237+
#endif
231238
// Annotations
232-
233239
public IMessage Annotation(string key, object value)
234240
{
235241
EnsureAnnotations();

Tests/Consumer/StreamConsumerTests.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,13 @@ void ml(ulong idx, IMessage msg)
231231
await WhenAllComplete(publishTasks);
232232
publishTasks.Clear();
233233

234-
var messages = new ConcurrentBag<IMessage>();
234+
long receivedCount = 0;
235235
Exception? messageHandlerEx = null;
236236
async Task messageHandler(IContext cxt, IMessage msg)
237237
{
238238
try
239239
{
240-
messages.Add(msg);
240+
Interlocked.Increment(ref receivedCount);
241241
await cxt.AcceptAsync();
242242
}
243243
catch (Exception ex)
@@ -257,14 +257,14 @@ async Task messageHandler(IContext cxt, IMessage msg)
257257

258258
using (IConsumer consumer = await consumerBuilder0.BuildAndStartAsync())
259259
{
260-
await WaitUntilStable(() => messages.Count);
260+
await WaitUntilStable(() => (int)Interlocked.Read(ref receivedCount));
261261
}
262262

263263
Assert.Null(messageHandlerEx);
264-
Assert.True(messages.Count >= messageWaveCount);
265-
Assert.True(messages.Count < allMessagesCount,
266-
$"0: messages.Count {messages.Count}, waveCount * messageWaveCount {allMessagesCount}");
267-
messages.Clear();
264+
Assert.True(receivedCount >= messageWaveCount);
265+
Assert.True(receivedCount < allMessagesCount,
266+
$"0: receivedCount {receivedCount}, waveCount * messageWaveCount {allMessagesCount}");
267+
receivedCount = 0;
268268

269269
IConsumerBuilder consumerBuilder1 = _connection.ConsumerBuilder()
270270
.Queue(_queueName)
@@ -277,15 +277,13 @@ async Task messageHandler(IContext cxt, IMessage msg)
277277

278278
using (IConsumer consumer = await consumerBuilder1.BuildAndStartAsync())
279279
{
280-
await WaitUntilStable(() => messages.Count);
280+
await WaitUntilStable(() => (int)Interlocked.Read(ref receivedCount));
281281
}
282282

283283
Assert.Null(messageHandlerEx);
284-
Assert.True(messages.Count >= 2 * messageWaveCount);
285-
//TODO note the Java test still asserts that messages.Count is less than allMessagesCount
286-
Assert.True(messages.Count <= allMessagesCount,
287-
$"1: messages.Count {messages.Count}, waveCount * messageWaveCount {allMessagesCount}");
288-
messages.Clear();
284+
Assert.True(receivedCount >= 2 * messageWaveCount);
285+
Assert.True(receivedCount < allMessagesCount,
286+
$"1: messages.Count {receivedCount}, waveCount * messageWaveCount {allMessagesCount}");
289287
}
290288

291289
/// <summary>

0 commit comments

Comments
 (0)