-
Notifications
You must be signed in to change notification settings - Fork 10
Auto Subscriber
AzureNetQ comes with a simple AutoSubscriber
. You can use it to easily scan a specific assembly for classes that implement either of the interfaces IConsume<T>
or IConsumeAsync<T>
, and then let the auto subscriber subscribe these consumers to your bus.
An implementation of IConsume<T>
will use the buses Subscribe method whilst implementations of IConsumeAsync<T>
will use the buses SubscribeAsync method, see Subscribe for details. You can of course let your consumers handle multiple messages.
You can also use use the auto subscriber to call the Respond/RespondAsync methods of the bus used in request/response. This is achieved using the IRespond<TRequest, TResponse>
and IRespondAsync<TRequest, TResponse>
where the type parameters represent the request and response message types respsectively
All the AutoSubscriber classes are in the AzureNetQ.AutoSubscribe namespace, so you'll need to add the following using statement:
using AzureNetQ.AutoSubscribe;
Lets define a simple consumer, handling three messages: MessageA
, MessageB
and MessageC
.
public class MyConsumer : IConsume<MessageA>, IConsume<MessageB>, IConsumeAsync<MessageC>
{
public void Consume(MessageA message) {...}
public void Consume(MessageB message) {...}
public Task Consume(MessageC message) {...}
}
First create a new instance of AutoSubscriber, passing your IBus instance. To register this, and all other consumers in the same Assembly, we just need to pass the assembly that contains your consumers to: AutoSubscriber.Subscribe(assembly)
. Note! This is something you only should do ONCE, preferably on application start up.
var subscriber = new AutoSubscriber(bus);
subscriber.Subscribe(Assembly.GetExecutingAssembly());
Please note that this feature is not yet implemented in AzureNetQ
By default the AutoSubscriber
will bind without topics. In the example below MessageA is registered with two topics.
public class MyConsumer : IConsume<MessageA>, IConsume<MessageB>, IConsumeAsync<MessageC>
{
[ForTopic("Topic.Foo")]
[ForTopic("Topic.Bar")]
public void Consume(MessageA message) {...}
public void Consume(MessageB message) {...}
public Task Consume(MessageC message) {...}
}
//To publish by topic
var bus = RabbitHutch.CreateBus("host=localhost");
var msg1 = new MessageA(msg1, "Topic.Foo"); //picked up
var msg2 = new MessageA(msg2, "Topic.Bar"); //picked up
var msg3 = new MessageA(msg3); //not picked up
By default the AutoSubscriber
will use the default Subscription
. If you would like to change it, you can decorate the Consume
method with the SubscriptionAttribute
. You can can read more about subscription names here.
Lets say, the consumer above should have a custom Subscription
for the consumer method of MessageB
. Just decorated it and define a value for Name
.
[Subscription(Name = "MyExplicitId")]
public void Consume(MessageB message) { }
###Using an IoC container with AutoSubscriber
AutoSubscriber has a property, MessageDispatcher, which allows you to plug in your own message dispatching code. This allows you to resolve your consumers from an IoC container or do other custom dispatch time tasks.
Let's write a custom IAutoSubscriberMessageDispatcher to resolve consumers from the Windsor IoC container
public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IWindsorContainer container;
public WindsorMessageDispatcher(IWindsorContainer container)
{
this.container = container;
}
public void Dispatch<TMessage, TConsumer>(TMessage message) where TMessage : class where TConsumer : IConsume<TMessage>
{
var consumer = container.Resolve<TConsumer>();
try
{
consumer.Consume(message);
}
finally
{
container.Release(consumer);
}
}
public Task DispatchAsync<TMessage, TConsumer>(TMessage message) where TMessage: class where TConsumer: IConsumeAsync<TMessage>
{
var consumer = _container.Resolve<TConsumer>();
return consumer.Consume(message).ContinueWith(t=>_container.Release(consumer));
}
}
Now we need to register our consumer with our IoC container:
var container = new WindsorContainer();
container.Register(
Component.For<MyConsumer>().ImplementedBy<MyConsumer>()
);
Next setup the AutoSubscriber with our custom IMessageDispatcher:
var bus = RabbitHutch.CreateBus("host=localhost");
var autoSubscriber = new AutoSubscriber(bus, "My_subscription_id_prefix")
{
MessageDispatcher = new WindsorMessageDispatcher(container)
};
autoSubscriber.Subscribe(GetType().Assembly);
autoSubscriber.SubscribeAsync(GetType().Assembly);
Now each time a message arrives a new instance of our consumer will be resolved from our container.
- Quick Start
- Introduction
- Casing in point: Topics and topics, Subscriptions and subscriptions
- Installing AzureNetQ
- Connecting to Service Bus
- Logging
- Publish
- Subscribe
- Request Response
- Send Receive
- Topic Based Routing
- Controlling Queue names
- Polymorphic Publish and Subscribe
- Scheduling Events with Future Publish
- Auto Subscriber
- Non Generic Publish & Subscription Extension Methods
- Replacing AzureNetQ Components
- Using a DI Container