Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@ public async Task PollMessages_WithHeaders_Should_PollMessages_Successfully(Prot
}
}

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task PollMessages_WithHeaders_Should_AutoParseUserHeadersFromRaw(Protocol protocol)
{
var (client, streamName) = await CreateStreamWithMessages(protocol);

var response = await client.PollMessagesAsync(new MessageFetchRequest
{
Count = 1,
AutoCommit = true,
Consumer = Consumer.New(2),
PartitionId = 0,
PollingStrategy = PollingStrategy.First(),
StreamId = Identifier.String(streamName),
TopicId = Identifier.String(HeadersTopicName)
});

response.Messages.Count.ShouldBe(1);
var msg = response.Messages[0];

Dictionary<HeaderKey, HeaderValue>? parsed = msg.UserHeaders;
parsed.ShouldNotBeNull();
parsed!.Count.ShouldBe(2);
parsed[HeaderKey.FromString("header1")].ToString().ShouldBe("value1");
parsed[HeaderKey.FromString("header2")].ToInt32().ShouldBe(14);

Dictionary<HeaderKey, HeaderValue>? second = msg.UserHeaders;
second.ShouldBeSameAs(parsed);
}

private static Message[] CreateMessagesWithoutHeader(int count)
{
var messages = new List<Message>();
Expand Down
272 changes: 272 additions & 0 deletions foreign/csharp/Iggy_SDK.Tests.Integration/IggyTypedConsumerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System.Text;
using Apache.Iggy.Consumers;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
using Apache.Iggy.IggyClient;
using Apache.Iggy.Kinds;
using Apache.Iggy.Messages;
using Apache.Iggy.Tests.Integrations.Fixtures;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Partitioning = Apache.Iggy.Kinds.Partitioning;

namespace Apache.Iggy.Tests.Integrations;

public class IggyTypedConsumerTests
{
[ClassDataSource<IggyServerFixture>(Shared = SharedType.PerAssembly)]
public required IggyServerFixture Fixture { get; init; }

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task ReceiveDeserializedAsync_Should_YieldMessages_WithCorrectData(Protocol protocol)
{
var client = protocol == Protocol.Tcp
? await Fixture.CreateTcpClient()
: await Fixture.CreateHttpClient();

var testStream = await CreateTestStreamWithMessages(client, protocol);

var consumerId = protocol == Protocol.Tcp ? 200 : 300;
IggyConsumer<string> consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId),
AutoCommitMode.Disabled, new Utf8StringDeserializer());

await consumer.InitAsync();

var received = new List<ReceivedMessage<string>>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await foreach (ReceivedMessage<string> msg in consumer.ReceiveDeserializedAsync(cts.Token))
{
msg.ShouldNotBeNull();
msg.Status.ShouldBe(MessageStatus.Success);
msg.Data.ShouldNotBeNull();
msg.Data.ShouldStartWith("Test message");
msg.PartitionId.ShouldBe(1u);
msg.Error.ShouldBeNull();
received.Add(msg);
if (received.Count >= 10)
{
break;
}
}

received.Count.ShouldBeGreaterThanOrEqualTo(10);
await consumer.DisposeAsync();
}

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task ReceiveDeserializedAsync_WithoutInit_Should_Throw_ConsumerNotInitializedException(
Protocol protocol)
{
var client = protocol == Protocol.Tcp
? await Fixture.CreateTcpClient()
: await Fixture.CreateHttpClient();

var testStream = await CreateTestStreamWithMessages(client, protocol);

var consumerId = protocol == Protocol.Tcp ? 201 : 301;
IggyConsumer<string> consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId),
AutoCommitMode.Disabled, new Utf8StringDeserializer());

await Should.ThrowAsync<ConsumerNotInitializedException>(async () =>
{
await foreach (ReceivedMessage<string> _ in consumer.ReceiveDeserializedAsync())
{
}
});

await consumer.DisposeAsync();
}

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task ReceiveDeserializedAsync_WithAutoCommitAfterReceive_Should_StoreOffset(Protocol protocol)
{
var client = protocol == Protocol.Tcp
? await Fixture.CreateTcpClient()
: await Fixture.CreateHttpClient();

var testStream = await CreateTestStreamWithMessages(client, protocol);

var consumerId = protocol == Protocol.Tcp ? 202 : 302;
IggyConsumer<string> consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId),
AutoCommitMode.AfterReceive, new Utf8StringDeserializer(),
PollingStrategy.First());

await consumer.InitAsync();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var count = 0;

await foreach (ReceivedMessage<string> _ in consumer.ReceiveDeserializedAsync(cts.Token))
{
count++;
if (count >= 5)
{
break;
}
}

await consumer.DisposeAsync();

var offset = await client.GetOffsetAsync(Consumer.New(consumerId),
Identifier.String(testStream.StreamId),
Identifier.String(testStream.TopicId),
1u);

offset.ShouldNotBeNull();
offset.StoredOffset.ShouldBe(3ul);
}

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task ReceiveDeserializedAsync_WithFailingDeserializer_Should_YieldDeserializationFailed(
Protocol protocol)
{
var client = protocol == Protocol.Tcp
? await Fixture.CreateTcpClient()
: await Fixture.CreateHttpClient();

var testStream = await CreateTestStreamWithMessages(client, protocol);

var consumerId = protocol == Protocol.Tcp ? 203 : 303;
IggyConsumer<string> consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId),
AutoCommitMode.Disabled, new FailingDeserializer());

await consumer.InitAsync();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await foreach (ReceivedMessage<string> msg in consumer.ReceiveDeserializedAsync(cts.Token))
{
msg.Status.ShouldBe(MessageStatus.DeserializationFailed);
msg.Data.ShouldBeNull();
msg.Error.ShouldNotBeNull();
msg.Error.ShouldBeOfType<InvalidOperationException>();
break;
}

await consumer.DisposeAsync();
}

[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task ReceiveDeserializedAsync_Should_StopCleanly_OnCancellation(Protocol protocol)
{
var client = protocol == Protocol.Tcp
? await Fixture.CreateTcpClient()
: await Fixture.CreateHttpClient();

var testStream = await CreateTestStreamWithMessages(client, protocol);

var consumerId = protocol == Protocol.Tcp ? 204 : 304;
IggyConsumer<string> consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId),
AutoCommitMode.Disabled, new Utf8StringDeserializer());

await consumer.InitAsync();

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));

await Should.NotThrowAsync(async () =>
{
try
{
await foreach (ReceivedMessage<string> _ in consumer.ReceiveDeserializedAsync(cts.Token))
{
}
}
catch (OperationCanceledException)
{
}
});

await consumer.DisposeAsync();
}

private static IggyConsumer<string> BuildTypedConsumer(IIggyClient client,
TestStreamInfo stream,
Consumer consumer,
AutoCommitMode autoCommitMode,
IDeserializer<string> deserializer,
PollingStrategy? pollingStrategy = null)
{
var config = new IggyConsumerConfig<string>
{
StreamId = Identifier.String(stream.StreamId),
TopicId = Identifier.String(stream.TopicId),
Consumer = consumer,
PollingStrategy = pollingStrategy ?? PollingStrategy.Next(),
BatchSize = 10,
PartitionId = 1,
AutoCommitMode = autoCommitMode,
AutoCommit = autoCommitMode != AutoCommitMode.Disabled,
PollingIntervalMs = 0,
Deserializer = deserializer
};
return new IggyConsumer<string>(client, config, NullLoggerFactory.Instance);
}

private async Task<TestStreamInfo> CreateTestStreamWithMessages(IIggyClient client, Protocol protocol,
uint partitionsCount = 5, int messagesPerPartition = 100)
{
var streamId = $"typed_stream_{Guid.NewGuid()}_{protocol.ToString().ToLowerInvariant()}";
var topicId = "test_topic";

await client.CreateStreamAsync(streamId);
await client.CreateTopicAsync(Identifier.String(streamId), topicId, partitionsCount);

for (uint partitionId = 0; partitionId < partitionsCount; partitionId++)
{
var messages = new List<Message>();
for (var i = 0; i < messagesPerPartition; i++)
{
messages.Add(new Message(Guid.NewGuid(),
Encoding.UTF8.GetBytes($"Test message {i} for partition {partitionId}")));
}

await client.SendMessagesAsync(Identifier.String(streamId),
Identifier.String(topicId),
Partitioning.PartitionId((int)partitionId),
messages);
}

return new TestStreamInfo(streamId, topicId, partitionsCount, messagesPerPartition);
}

private record TestStreamInfo(string StreamId, string TopicId, uint PartitionsCount, int MessagesPerPartition);

private sealed class Utf8StringDeserializer : IDeserializer<string>
{
public string Deserialize(ReadOnlyMemory<byte> data)
{
return Encoding.UTF8.GetString(data.Span);
}
}

private sealed class FailingDeserializer : IDeserializer<string>
{
public string Deserialize(ReadOnlyMemory<byte> data)
{
throw new InvalidOperationException("Intentional deserialization failure");
}
}
}
2 changes: 1 addition & 1 deletion foreign/csharp/Iggy_SDK.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_SDK", "Iggy_SDK\Iggy_SDK.csproj", "{661540EB-81F9-492C-828B-CF787BEBD50B}"
EndProject
Expand Down
22 changes: 9 additions & 13 deletions foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace Apache.Iggy.Consumers;
/// Interface for deserializing message payloads from byte arrays to type T.
/// <para>
/// No type constraints are enforced on T to provide maximum flexibility.
/// Implementations are responsible for ensuring that the provided byte data can be properly deserialized to the target type.
/// Implementations are responsible for ensuring that the provided byte data can be properly deserialized to the
/// target type.
/// </para>
/// </summary>
/// <typeparam name="T">
Expand All @@ -37,18 +38,13 @@ namespace Apache.Iggy.Consumers;
public interface IDeserializer<out T>
{
/// <summary>
/// Deserializes a byte array into an instance of type T.
/// Deserializes a read-only memory into an instance of type T. Callers may pass a <c>byte[]</c> directly
/// thanks to the implicit conversion to <see cref="ReadOnlyMemory{T}" />.
/// </summary>
/// <param name="data">The byte array containing the serialized data to deserialize.</param>
/// <param name="data">
/// Read-only memory containing the serialized data. The implementation MUST NOT retain a reference to
/// the span after returning.
/// </param>
/// <returns>An instance of type T representing the deserialized data.</returns>
/// <exception cref="System.FormatException">
/// Thrown when the data format is invalid and cannot be deserialized.
/// </exception>
/// <exception cref="System.ArgumentException">
/// Thrown when the data cannot be deserialized due to invalid content or structure.
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// Thrown when the deserialization operation fails due to state issues.
/// </exception>
T Deserialize(byte[] data);
T Deserialize(ReadOnlyMemory<byte> data);
}
Loading
Loading