Skip to content

Inconsistent deserialization of untyped object parameters across IMessageSerializer implementations #2249

@nkz-soft

Description

@nkz-soft

Description

I am working on a task that requires passing untyped parameters within a message. Specifically, in my RoutingSlip implementation, I need to accept a dictionary of parameters of various types: IDictionary<string, object>.

The problem is that different IMessageSerializer implementations have different internal representations for object. Furthermore, when using MemoryPack, using untyped values seems impossible.

Reproduction Steps / Test Results

I created a test to verify how different serializers handle untyped object values.

public record JsonTestMessage(string Number, object Value);

[MemoryPackable]
public partial record MemoryPackTestMessage(string Number, int Value);

[MessagePackObject]
public record MessagePackTestMessage(
    [property: Key(0)] string Number,
    [property: Key(1)] object Value);

[ProtoContract]
public class ProtoBufNetTestMessage
{
    [ProtoMember(1)]
    public string Number { get; set; } = string.Empty;

    public object Value { get; set; } = null!;
}

public static class TestMessageHandler
{
    public static ValueTask HandleAsync(JsonTestMessage message, CancellationToken ct)
    {
        Console.WriteLine($"JsonTestMessage:  {message.Value.GetType().FullName}");
        return ValueTask.CompletedTask;
    }

    public static ValueTask HandleAsync(MemoryPackTestMessage message, CancellationToken ct)
    {
        Console.WriteLine($"MemoryPackTestMessage:  {message.Value.GetType().FullName}");
        return ValueTask.CompletedTask;
    }

    public static ValueTask HandleAsync(MessagePackTestMessage message, CancellationToken ct)
    {
        Console.WriteLine($"MessagePackTestMessage:  {message.Value.GetType().FullName}");
        return ValueTask.CompletedTask;
    }

    public static ValueTask HandleAsync(ProtoBufNetTestMessage message, CancellationToken ct)
    {
        Console.WriteLine($"ProtoBufNetTestMessage:  {message.Value.GetType().FullName}");
        return ValueTask.CompletedTask;
    }
}

public class VWMessageTypeTests
{
    [Fact]
    public async Task DefaultSerializerUsesJsonElementForObjectValues()
    {
        var subHostPort = PortFinder.GetAvailablePort();

        var pubHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.PublishMessage<JsonTestMessage>().ToPort(subHostPort);
        }).StartAsync();

        var subHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.ListenAtPort(subHostPort);
        }).StartAsync();

        try
        {
            var message = new JsonTestMessage("1", 1);
            var session = await pubHost.TrackActivity().AlsoTrack(subHost).SendMessageAndWaitAsync(message);

            var received = session.Received.MessagesOf<JsonTestMessage>().ToList();
            Assert.Single(received);
            Assert.IsType<System.Text.Json.JsonElement>(received[0].Value);
        }
        finally
        {
            await pubHost.StopAsync();
            await subHost.StopAsync();
        }
    }

    [Fact]
    public async Task MemoryPackSerializerPreservesConcreteIntValue()
    {
        var subHostPort = PortFinder.GetAvailablePort();

        var pubHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.UseMemoryPackSerialization();
            opts.PublishMessage<MemoryPackTestMessage>().ToPort(subHostPort);
        }).StartAsync();

        var subHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.UseMemoryPackSerialization();
            opts.ListenAtPort(subHostPort);
        }).StartAsync();

        try
        {
            var message = new MemoryPackTestMessage("1", 1);
            var session = await pubHost.TrackActivity().AlsoTrack(subHost).SendMessageAndWaitAsync(message);

            var received = session.Received.MessagesOf<MemoryPackTestMessage>().ToList();
            Assert.Single(received);
            Assert.Equal(1, received[0].Value);
            Assert.IsType<int>(received[0].Value);
        }
        finally
        {
            await pubHost.StopAsync();
            await subHost.StopAsync();
        }
    }

    [Fact]
    public async Task MessagePackSerializerPreservesConcreteIntValue()
    {
        var subHostPort = PortFinder.GetAvailablePort();

        var pubHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.UseMessagePackSerialization();
            opts.PublishMessage<MessagePackTestMessage>().ToPort(subHostPort);
        }).StartAsync();

        var subHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.UseMessagePackSerialization();
            opts.ListenAtPort(subHostPort);
        }).StartAsync();

        try
        {
            var message = new MessagePackTestMessage("1", 1);
            var session = await pubHost.TrackActivity().AlsoTrack(subHost).SendMessageAndWaitAsync(message);

            var received = session.Received.MessagesOf<MessagePackTestMessage>().ToList();
            Assert.Single(received);
            Assert.Equal(1, received[0].Value);
            Assert.IsType<int>(received[0].Value);
        }
        finally
        {
            await pubHost.StopAsync();
            await subHost.StopAsync();
        }
    }

    [Fact]
    public async Task ProtoBufNetSerializerPreservesConcreteIntValue()
    {
        var subHostPort = PortFinder.GetAvailablePort();

        var pubHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.PublishMessage<ProtoBufNetTestMessage>().ToPort(subHostPort).UseProtobufSerialization();
        }).StartAsync();

        var subHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
        {
            opts.ListenAtPort(subHostPort).UseProtobufSerialization();
        }).StartAsync();

        try
        {
            var message = new ProtoBufNetTestMessage { Number = "1", Value = 1 };
            var session = await pubHost.TrackActivity().AlsoTrack(subHost).SendMessageAndWaitAsync(message);

            var received = session.Received.MessagesOf<ProtoBufNetTestMessage>().ToList();
            Assert.Single(received);
            Assert.Equal("1", received[0].Number);
            Assert.Equal(1, received[0].Value);
            Assert.IsType<int>(received[0].Value);
        }
        finally
        {
            await pubHost.StopAsync();
            await subHost.StopAsync();
        }
    }
}

Below are the results I observed during deserialization:

  • Json: System.Text.Json.JsonElement
  • MemoryPack: System.Int32
  • MessagePack: System.Int32
  • ProtoBuf: Throws an exception
    Wolverine.WolverineSerializationException: Error trying to serialize message of type WVMessageFormatTest.ProtoBufNetTestMessage with serializer Wolverine.Protobuf.Internal.ProtobufMessageSerializer
      ---> System.ArgumentException: Message must implement IMessage (Parameter 'message')
    

Current Behavior

Currently, my RoutingSlip implementation is forced to account for the specific way each IMessageSerializer implementation serializes object. This creates tight coupling between the routing logic and the specific serializer being used, which defeats the purpose of abstraction.

Expected Behavior

Handlers should receive native .NET types regardless of the underlying serializer used. The serialization details should be abstracted away.

Proposed Solution

I believe we need an abstraction layer that normalizes the differences in serialization. This layer would ensure that by the time the data reaches the handlers, it has been converted to standard .NET types.

What are your thoughts on this approach?

Metadata

Metadata

Assignees

No one assigned

    Labels

    wontfixThis will not be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions