Skip to content
Pete Smith edited this page Jul 8, 2014 · 20 revisions

An AzureNetQ subscriber subscribes to a particular message type (the .NET type of the message class). Once a handler has been attached by calling the Subscribe method, a persistent Azure Topic will be created on the Service Bus broker and a relevent Subscription created within it.

While the subscribing application is running, Service Bus will invoke this handler whenever a message arrives on the Subscription. For example:

bus.Subscribe<MyMessage>(msg => Console.WriteLine(msg.Text));

Now every time that an instance of MyMessage is published, AzureNetQ will call our attached handler and print the message’s Text property to the console.

Subscription Ids and Competing Consumers

In the above example, a Default Subscription was created on the Service Bus broker. If multiple handlers are attached via subscribefor the same message type, then they will re-use the same Subscription. This means that messages sent to the Subscription via the Topic will be shared out among the handlers

The subscription id that you pass to Subscribe is important. EasyNetQ will create a unique queue on the RabbitMQ broker for each unique combination of message type and subscription id. The general rule you should follow is that each call to subscribe should have its own id. Indeed, this is a case where you shouldn’t follow normal good practice and where a hard-coded string is probably the best option.

Considerations when writing the subscribe callback delegate

As messages are received from queues subscribed to via EasyNetQ, they are placed on an in-memory queue. A single thread sits in a loop taking messages from the queue and calling their Action delegates. Since the delegates are processed one at a time on a single thread, you should avoid long-running synchronous IO operations. Return control from the delegate as soon as possible.

Use SubscribeAsync

SubscribeAsync allows your subscriber delegate to return a Task immediately and then asynchronously execute long-running IO operations. Once the long-running subscription is complete, simply complete the Task. In the example below we are making a request to a web service using an asynchronous IO operation (DownloadStringTask). When the task completes, we write a line to the console.

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task => 
            Console.WriteLine("Received: '{0}', Downloaded: '{1}'", 
                message.Text, 
                task.Result)));

Cancelling subscriptions

All the subscribe methods return an IDisposable. You can cancel a subscriber at any time by calling Dispose on the IDisposable instance:

var consumer = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

consumer.Dispose();

This will stop EasyNetQ consuming from the queue and close the consumer's channel.

Distributed processing out-of-the-box

EasyNetQ and RabbitMQ provide distributed processing out-of-the-box. Say we have written a windows service with a single call to subscribe just like the one above. We deploy it on a server and start it up. When the Subscribe call is run EasyNetQ creates a queue called something like 'someNamespace_myMessage:someAssembly_mySubscriptionId' on the RabbitMQ broker. As instances of MyMessage are published they are routed to this queue and our windows service gets a copy of every message. This is exactly what we want.

Now, what if we deploy a second instance of our windows service on a second server and start it up? When the Subscribe call runs, EasyNetQ will find that there’s already a queue that matches the subscriber id / message type combination, so instead of creating a new queue it will simply start consuming from the existing queue created by the first instance. When RabbitMQ has two consumers consuming from the same queue, it sends messages to the consumers, round-robin style, in turn. So the first message will be sent the the first instance, the second to the second instance and then the third to the first instance, and so on. We get distributed processing out-of-the-box, with no need for any special programming techniques when writing our subscribers, or special software or hardware load balancers.

Clone this wiki locally