Skip to content

HandlerInvoker occasionally fails to invoke MessageHandlers (with same SubscriberMapping.MessageType)ย #284

@will-molloy

Description

@will-molloy

Describe the bug

HandlerInvoker occasionally fails to invoke MessageHandlers (with same SubscriberMapping.MessageType):

System.Reflection.TargetException: Object does not match target type.
   at System.Reflection.MethodInvokerCommon.ValidateInvokeTarget(Object target, MethodBase method)
   at System.Reflection.RuntimeMethodInfo.Invoke(Object obj, BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
   at AWS.Messaging.Services.HandlerInvoker.InvokeAsync(MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token)

I'm assuming it's code:

var method = _handlerMethods.GetOrAdd(subscriberMapping.MessageType, x =>
{
return subscriberMapping.HandlerType.GetMethod( // Look up the method on the handler type with:
nameof(IMessageHandler<MessageProcessStatus>.HandleAsync), // name "HandleAsync"
new Type[] { messageEnvelope.GetType(), typeof(CancellationToken) }); // parameters (MessageEnvelope<MessageType>, CancellationToken)
});
if (method == null)
{
_logger.LogError("Unable to resolve a compatible HandleAsync method for {HandlerType} while handling message ID {MessageEnvelopeId}.", subscriberMapping.HandlerType, messageEnvelope.Id);
throw new InvalidMessageHandlerSignatureException($"Unable to resolve a compatible HandleAsync method for {subscriberMapping.HandlerType} while handling message ID {messageEnvelope.Id}.");
}
try
{
var task = method.Invoke(handler, new object[] { messageEnvelope, token }) as Task<MessageProcessStatus>;

Note I'm using a custom EnvelopeSerializer and hence SubscriberMapping (added context below).

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

The MessageHandlers are invoked successfully for every message and every retry attempt.

Current Behavior

The MessageHandlers are sometimes not invoked and produce the error shown above.

This could happen on the first attempt, a subsequent retry, or for all retry attempts of a message.

Reproduction Steps

This seems to only happen under load.

Register a custom IEnvelopeSerializer which creates SubscriberMapping with the same MessageType, but different HandlerType.

For example:

        SubscriberMapping mapping;
        if (s3Metadata.Prefix.StartsWith("inbox/"))
        {
            mapping = SubscriberMapping.Create<InboxMessageHandler, S3Metadata>("inbox");
        }
        else if (s3Metadata.Prefix.StartsWith("failed/"))
        {
            mapping = SubscriberMapping.Create<FailedMessageHandler, S3Metadata>("failed");
        }

This seems to create a race on the HandlerInvoker._handlerMethods cache, where it retrieves the method for a different handler:

var method = _handlerMethods.GetOrAdd(subscriberMapping.MessageType, x =>
{
return subscriberMapping.HandlerType.GetMethod( // Look up the method on the handler type with:
nameof(IMessageHandler<MessageProcessStatus>.HandleAsync), // name "HandleAsync"
new Type[] { messageEnvelope.GetType(), typeof(CancellationToken) }); // parameters (MessageEnvelope<MessageType>, CancellationToken)
});
if (method == null)
{
_logger.LogError("Unable to resolve a compatible HandleAsync method for {HandlerType} while handling message ID {MessageEnvelopeId}.", subscriberMapping.HandlerType, messageEnvelope.Id);
throw new InvalidMessageHandlerSignatureException($"Unable to resolve a compatible HandleAsync method for {subscriberMapping.HandlerType} while handling message ID {messageEnvelope.Id}.");
}
try
{
var task = method.Invoke(handler, new object[] { messageEnvelope, token }) as Task<MessageProcessStatus>;

Possible Solution

Instead of subscriberMapping.MessageType, use subscriberMapping.MessageTypeIdentifier as the cache key.

Alternatively it could cache both the handler and method?

Additional Information/Context

I'm using this library to consume 2 SQS queues (in the same app). These queues consume S3 events (i.e. produced by AWS).

Given these messages are not in CloudEvents format, I created a custom IEnvelopeSerializer as per this issue: #141

This custom IEnvelopeSerializer inspects the S3 prefix and routes the message to the appropriate handler.

For example:

    public ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message message)
    {
        var s3Metadata = ExtractS3Metadata(message);

        SubscriberMapping mapping;
        if (s3Metadata.Prefix.StartsWith("inbox/"))
        {
            mapping = SubscriberMapping.Create<InboxMessageHandler, S3Metadata>("inbox");
        }
        else if (s3Metadata.Prefix.StartsWith("failed/"))
        {
            mapping = SubscriberMapping.Create<FailedMessageHandler, S3Metadata>("failed");
        }
        else
        {
            throw new AmazonSQSException($"Unexpected prefix: {s3Metadata.Prefix}");
        }

        var messageEnvelope = new MessageEnvelope<S3Metadata>
        {
            Id = message.MessageId,
            Message = s3Metadata,
            SQSMetadata = ExtractSqsMetadata(message),
        };
        return ValueTask.FromResult(new ConvertToEnvelopeResult(messageEnvelope, mapping));
    }

Please let me know if this is incorrect usage of the library ๐Ÿ™

AWS.Messaging (or related) package versions

AWS.Messaging 1.0.1

Targeted .NET Platform

.NET 8

Operating System and version

Linux (dotnet/aspnet:8.0 container)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingp1This is a high priority issuequeued

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions