Skip to content
Pete Smith edited this page Jul 8, 2014 · 20 revisions

An AzureNetQ subscriber subscribes to a particular message type (the .NET type of the message class). Once a handler has been attached by calling the Subscribe method, a persistent Azure Topic will be created on the Service Bus broker and a relevent Subscription created within it.

While the subscribing application is running, this handler will be invoked whenever a message arrives on the Subscription. For example:

bus.Subscribe<MyMessage>(msg => Console.WriteLine(msg.Text));

Now every time that an instance of MyMessage is published, AzureNetQ will call our attached handler and print the message’s Text property to the console.

Subscription Ids and Competing Consumers

In the above example, a Topic was created for MyMessage on the broker and a default Subscription was created within it. If multiple handlers are attached for the same message type then they will re-use the same Subscription - each message will be processed only once with the handlers taking turns. This pattern is known as competing consumers.

We can specify that we want to create a separate Subscription via fluent configuration passed to the Subscribe method:

bus.Subscribe<MyMessage>(msg => Console.WriteLine("One: " + msg.Text), 
    x => x.WithSubscriptionId("One"));

bus.Subscribe<MyMessage>(msg => Console.WriteLine("Two: " + msg.Text), 
    x => x.WithSubscriptionId("Two"));

AzureNetQ will create a unique Subscription within a Topic on the Service Bus broker for each unique subscription id. Unless you want to handle the same message more than once - for instance when dealing with orthogonal concerns - you don't need to specify a subscription id.

Considerations when writing the subscribe callback delegate

As messages are received from queues subscribed to via EasyNetQ, they are placed on an in-memory queue. A single thread sits in a loop taking messages from the queue and calling their Action delegates. Since the delegates are processed one at a time on a single thread, you should avoid long-running synchronous IO operations. Return control from the delegate as soon as possible.

Use SubscribeAsync

SubscribeAsync allows your subscriber delegate to return a Task immediately and then asynchronously execute long-running IO operations. Once the long-running subscription is complete, simply complete the Task. In the example below we are making a request to a web service using an asynchronous IO operation (DownloadStringTask). When the task completes, we write a line to the console.

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task => 
            Console.WriteLine("Received: '{0}', Downloaded: '{1}'", 
                message.Text, 
                task.Result)));

Cancelling subscriptions

All the subscribe methods return an IDisposable. You can cancel a subscriber at any time by calling Dispose on the IDisposable instance:

var consumer = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

consumer.Dispose();

This will stop EasyNetQ consuming from the queue and close the consumer's channel.

Distributed processing out-of-the-box

EasyNetQ and RabbitMQ provide distributed processing out-of-the-box. Say we have written a windows service with a single call to subscribe just like the one above. We deploy it on a server and start it up. When the Subscribe call is run EasyNetQ creates a queue called something like 'someNamespace_myMessage:someAssembly_mySubscriptionId' on the RabbitMQ broker. As instances of MyMessage are published they are routed to this queue and our windows service gets a copy of every message. This is exactly what we want.

Now, what if we deploy a second instance of our windows service on a second server and start it up? When the Subscribe call runs, EasyNetQ will find that there’s already a queue that matches the subscriber id / message type combination, so instead of creating a new queue it will simply start consuming from the existing queue created by the first instance. When RabbitMQ has two consumers consuming from the same queue, it sends messages to the consumers, round-robin style, in turn. So the first message will be sent the the first instance, the second to the second instance and then the third to the first instance, and so on. We get distributed processing out-of-the-box, with no need for any special programming techniques when writing our subscribers, or special software or hardware load balancers.

Clone this wiki locally