Skip to content

Commit 2b5177b

Browse files
lukebakkenTimz95Gsantomaggio
authored
Fix "Data instances with a length of 255 bytes are wrongly written" (#161)
* Added unit test for comparing the size of the data versus the actual written data. Tests the edge cases of determining if the data must be written with formatcode Vbin8 or Vbin32. * Fix inconsistent data size check in Data struct when writing the underlying data. * Added test for producing and consuming events with a size of 254-256 bytes. * Fixups from `dotnet format` * Replaced collections in TestEventLength-test with single variables to check on * Fixups from `dotnet format` * Refactor tests to be compliant with other tests add the same fix to GetSequenceSize remove the integration test file and move the test to ProducerSystemTests file Signed-off-by: Gabriele Santomaggio <[email protected]> Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Tim Heusschen <[email protected]> Co-authored-by: Timz95 <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent fc84f76 commit 2b5177b

File tree

4 files changed

+100
-2
lines changed

4 files changed

+100
-2
lines changed

RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private static int WriteTimestamp(Span<byte> seq, DateTime value)
212212
// determinate the type size
213213
public static int GetSequenceSize(ReadOnlySequence<byte> data)
214214
{
215-
if (data.Length < 256)
215+
if (data.Length <= byte.MaxValue)
216216
{
217217
return (int)data.Length +
218218
1 + //marker 1 byte FormatCode.Vbin8

RabbitMQ.Stream.Client/AMQP/Data.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public Data(ReadOnlySequence<byte> data)
2929
public int Write(Span<byte> span)
3030
{
3131
var offset = DescribedFormatCode.Write(span, DescribedFormatCode.ApplicationData);
32-
if (data.Length < byte.MaxValue)
32+
if (data.Length <= byte.MaxValue)
3333
{
3434
offset += WireFormatting.WriteByte(span.Slice(offset), FormatCode.Vbin8); //binary marker
3535
offset += WireFormatting.WriteByte(span.Slice(offset), (byte)data.Length); //length

Tests/Amqp10Tests.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,5 +462,19 @@ public void MapEntriesWithAnEmptyKeyShouldNotBeWrittenToTheWire()
462462
// we do not expect the new entry to be written
463463
Assert.Equal(expectedMapSize, actualMapSize);
464464
}
465+
466+
[Theory]
467+
// AmqpWireFormattingWrite.GetSequenceSize + DescribedFormat.Size
468+
[InlineData(254, 254 + 1 + 1 + 3)]
469+
[InlineData(255, 255 + 1 + 1 + 3)]
470+
[InlineData(256, 256 + 1 + 4 + 3)]
471+
public void WriteDataEdgeCasesTests(int size, int expectedLengthWritten)
472+
{
473+
var buffer = new Span<byte>(new byte[4096]);
474+
var bytes = new byte[size];
475+
var data = new Data(new ReadOnlySequence<byte>(bytes));
476+
var written = data.Write(buffer);
477+
Assert.Equal(expectedLengthWritten, written);
478+
}
465479
}
466480
}

Tests/ProducerSystemTests.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// Copyright (c) 2007-2020 VMware, Inc.
44

55
using System;
6+
using System.Buffers;
7+
using System.Collections;
68
using System.Collections.Generic;
79
using System.Text;
810
using System.Threading;
@@ -341,5 +343,87 @@ public async void ProducerBatchConfirmNumberOfMessages()
341343
await system.DeleteStream(stream);
342344
await system.Close();
343345
}
346+
347+
private class EventLengthTestCases : IEnumerable<object[]>
348+
{
349+
private readonly Random _random = new(3895);
350+
351+
public IEnumerator<object[]> GetEnumerator()
352+
{
353+
yield return new object[] { GetRandomBytes(254) };
354+
yield return new object[] { GetRandomBytes(255) };
355+
yield return new object[] { GetRandomBytes(256) };
356+
// just to test an event greater than 256 bytes
357+
yield return new object[] { GetRandomBytes(654) };
358+
}
359+
360+
private ReadOnlySequence<byte> GetRandomBytes(ulong length)
361+
{
362+
var arr = new byte[length];
363+
_random.NextBytes(arr);
364+
return new ReadOnlySequence<byte>(arr);
365+
}
366+
367+
IEnumerator IEnumerable.GetEnumerator()
368+
{
369+
return GetEnumerator();
370+
}
371+
}
372+
373+
[Theory]
374+
[ClassData(typeof(EventLengthTestCases))]
375+
public async Task ProducerSendsArrays255Bytes(ReadOnlySequence<byte> @event)
376+
{
377+
// Test the data around 255 bytes
378+
// https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/160
379+
// We test if the data is correctly sent and received
380+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
381+
var testPassed = new TaskCompletionSource<bool>();
382+
var producer = await system.CreateProducer(new
383+
ProducerConfig
384+
{
385+
Reference = "producer",
386+
Stream = stream,
387+
ConfirmHandler = _ =>
388+
{
389+
testPassed.SetResult(true);
390+
}
391+
}
392+
);
393+
394+
const ulong PublishingId = 0;
395+
var msg = new Message(new Data(@event))
396+
{
397+
ApplicationProperties = new ApplicationProperties { { "myArray", @event.First.ToArray() } }
398+
};
399+
400+
await producer.Send(PublishingId, msg);
401+
new Utils<bool>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
402+
403+
var testMessageConsumer = new TaskCompletionSource<Message>();
404+
405+
var consumer = await system.CreateConsumer(new ConsumerConfig
406+
{
407+
Stream = stream,
408+
// Consume the stream from the Offset
409+
OffsetSpec = new OffsetTypeOffset(),
410+
// Receive the messages
411+
MessageHandler = (_, _, message) =>
412+
{
413+
testMessageConsumer.SetResult(message);
414+
return Task.CompletedTask;
415+
}
416+
});
417+
418+
new Utils<Message>(testOutputHelper).WaitUntilTaskCompletes(testMessageConsumer);
419+
// at this point the data length _must_ be the same
420+
Assert.Equal(@event.Length, testMessageConsumer.Task.Result.Data.Contents.Length);
421+
422+
Assert.Equal(@event.Length,
423+
((byte[])testMessageConsumer.Task.Result.ApplicationProperties["myArray"]).Length);
424+
await consumer.Close();
425+
await system.DeleteStream(stream);
426+
await system.Close();
427+
}
344428
}
345429
}

0 commit comments

Comments
 (0)