System.InvalidOperationException: Cannot resolve scoped service 'SlimMessageBus.Host.ConsumerContext' from root provider. #405
Replies: 3 comments 5 replies
-
There is an issue with SlimMessageBus's handling of the scoped ConsumerContext service. Could someone please let me know how this issue can be resolved? |
Beta Was this translation helpful? Give feedback.
-
@manish-doshi I looked at your sample and have changed it a bit to make it work:
Somehow, the Here is the adjusted sample to create a scope inside the HostedService: using SlimMessageBus;
using SlimMessageBus.Host;
using SlimMessageBus.Host.Memory;
namespace TestSlimMessageBus
{
public class Program
{
// Provide your event hub-names OR kafka/service bus topic names
public const string topicForAddCommand = "add-command";
public const string topicForMultiplyRequest = "multiply-request";
//const string topicForResponses = "responses";
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
//builder.Services.AddSwaggerGen();
builder.Services.AddSlimMessageBus(mbb =>
{
// Configure your bus here...
mbb.WithProviderMemory(cfg =>
{
// Configure the in-memory provider here if needed
cfg.EnableBlockingPublish = false;
});
mbb.Produce<AddCommand>(x => x.DefaultTopic(topicForAddCommand));
mbb.Consume<AddCommand>(x => x.Topic(topicForAddCommand)
.WithConsumer<AddCommandConsumer>());
mbb.Produce<MultiplyRequest>(x => x.DefaultTopic(topicForMultiplyRequest));
mbb.Handle<MultiplyRequest, MultiplyResponse>(x => x.Topic(topicForMultiplyRequest)
.WithHandler<MultiplyRequestHandler>());
//mbb.AddServicesFromAssemblyContaining<Program>(); // Recommned using this instead of manually adding the consumer / handler (further down).
});
builder.Services.AddHostedService<ApplicationService>();
builder.Services.AddTransient<AddCommandConsumer>();
builder.Services.AddTransient<MultiplyRequestHandler>();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
//app.UseSwagger();
//app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
}
}
public class ApplicationService : IHostedService
{
private readonly Random _random = new();
private readonly IServiceProvider _serviceProviderRoot;
private IServiceScope _serviceScope;
private bool _canRun = true;
private IMessageBus _messageBus;
public ApplicationService(IServiceProvider serviceProvider)
{
_serviceProviderRoot = serviceProvider;
}
public Task StartAsync(CancellationToken cancellationToken)
{
// Create a scope for this hosted service to perform its logic
_serviceScope = _serviceProviderRoot.CreateScope();
_messageBus = _serviceScope.ServiceProvider.GetService<IMessageBus>();
var addTask = Task.Factory.StartNew(AddLoop, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
var multiplyTask = Task.Factory.StartNew(MultiplyLoop, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_canRun = false;
if (_serviceScope is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync();
}
}
private async Task AddLoop()
{
while (_canRun)
{
var a = _random.Next(100);
var b = _random.Next(100);
Console.WriteLine("Producer: Sending numbers {0} and {1}", a, b);
try
{
await _messageBus.Publish(new AddCommand(a, b));
}
catch (Exception ex)
{
Console.WriteLine("Producer: publish error {0}", ex.Message);
}
await Task.Delay(1000); // Simulate some delay
}
}
private async Task MultiplyLoop()
{
while (_canRun)
{
var a = _random.Next(100);
var b = _random.Next(100);
Console.WriteLine("Sender: Sending numbers {0} and {1}", a, b);
try
{
var response = await _messageBus.Send(new MultiplyRequest(a, b));
Console.WriteLine("Sender: Got response back with result {0}", response.Result);
}
catch (Exception e)
{
Console.WriteLine("Sender: request error or timeout: " + e);
}
await Task.Delay(1000); // Simulate some work
}
}
}
public record AddCommand(int Left, int Right);
public class AddCommandConsumer : IConsumer<AddCommand>
{
public async Task OnHandle(AddCommand message, CancellationToken cancellationToken)
{
Console.WriteLine("Consumer: Adding {0} and {1} gives {2}", message.Left, message.Right, message.Left + message.Right);
// Context.Headers -> has the headers
await Task.Delay(50, cancellationToken); // Simulate some work
}
}
public record MultiplyRequest(int Left, int Right) : IRequest<MultiplyResponse>;
public record MultiplyResponse(int Result);
public class MultiplyRequestHandler : IRequestHandler<MultiplyRequest, MultiplyResponse>
{
public async Task<MultiplyResponse> OnHandle(MultiplyRequest request, CancellationToken cancellationToken)
{
await Task.Delay(50, cancellationToken); // Simulate some work
return new(request.Left * request.Right);
}
}
} Furthermore, I encourage you to configure this using the automatic consumer discovery to make it more succinct:
builder.Services.AddSlimMessageBus(mbb =>
{
// Configure your bus here...
mbb.WithProviderMemory(cfg =>
{
// Configure the in-memory provider here if needed
cfg.EnableBlockingPublish = false;
});
mbb.Produce<AddCommand>(x => x.DefaultTopic(topicForAddCommand));
mbb.Consume<AddCommand>(x => x.Topic(topicForAddCommand));
mbb.Produce<MultiplyRequest>(x => x.DefaultTopic(topicForMultiplyRequest));
mbb.Handle<MultiplyRequest, MultiplyResponse>(x => x.Topic(topicForMultiplyRequest));
mbb.AddServicesFromAssemblyContaining<Program>();
});
builder.Services.AddHostedService<ApplicationService>(); Please let me know if it worked for you. |
Beta Was this translation helpful? Give feedback.
-
Thanks @zarusz, your suggestion worked. Consider the following code. In this, RequesterService is a singleton and, for the sake of discussion, SendMessageAsync is called later from somewhere. Shall I create the scope in the constructor as below? `
` |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am trying to create in-process, in-memory messaging using SimpleMessageBus
Attached is my Asp.Net test application. I am using SlimMessageBus.Host.Memory version 3.2.0 NuGet package. I have created this from the sample console app provided in this repository.
Program.zip
When I run this, I get the following error while running the request-response use case, invoking MultiplyRequestHandler
Sender: request error or timeout: System.InvalidOperationException: Cannot resolve scoped service 'SlimMessageBus.Host.ConsumerContext' from root provider.
at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteValidator.ValidateResolution(ServiceCallSite callSite, IServiceScope scope, IServiceScope rootScope)
at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(ServiceIdentifier serviceIdentifier, ServiceProviderEngineScope serviceProviderEngineScope)
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at SlimMessageBus.Host.MessageHandler.CreateConsumerContext(IMessageScope messageScope, IReadOnlyDictionary
2 messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, Object transportMessage, Object consumerInstance, IMessageBus messageBus, IDictionary
2 consumerContextProperties, CancellationToken cancellationToken)at SlimMessageBus.Host.MessageProcessor
1.CreateConsumerContext(IMessageScope messageScope, IReadOnlyDictionary
2 messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, Object transportMessage, Object consumerInstance, IMessageBus messageBus, IDictionary2 consumerContextProperties, CancellationToken cancellationToken) at SlimMessageBus.Host.MessageHandler.DoHandle(Object message, IReadOnlyDictionary
2 messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, Object transportMessage, IDictionary2 consumerContextProperties, IServiceProvider currentServiceProvider, CancellationToken cancellationToken) at SlimMessageBus.Host.Memory.MemoryMessageBus.ProduceInternal[TResponseMessage](Object message, String path, IDictionary
2 requestHeaders, IMessageBusTarget targetBus, Boolean isPublish, CancellationToken cancellationToken)at SlimMessageBus.Host.MessageBusBase.ProduceSend[TResponse](Object request, String path, IDictionary
2 headers, Nullable
1 timeout, IMessageBusTarget targetBus, CancellationToken cancellationToken)at TestSlimMessageBus.ApplicationService.MultiplyLoop() in C:\TestCode\2022\TestSlimMessageBus\TestSlimMessageBus\Program.cs:line 125
How to fix this?
Beta Was this translation helpful? Give feedback.
All reactions