-
Notifications
You must be signed in to change notification settings - Fork 10
Subscribe
An AzureNetQ subscriber subscribes to a particular message type (the .NET type of the message class). Once a handler has been attached by calling the Subscribe method, a persistent Azure Topic will be created on the Service Bus broker and a relevent Subscription created within it.
While the subscribing application is running, this handler will be invoked whenever a message arrives on the Subscription. For example:
bus.Subscribe<MyMessage>(msg => Console.WriteLine(msg.Text));
Now every time that an instance of MyMessage is published, AzureNetQ will call our attached handler and print the message’s Text property to the console.
In the above example, a Topic was created for MyMessage on the broker and a default Subscription was created within it. If multiple handlers are attached for the same message type then they will re-use the same Subscription - each message will be processed only once with the handlers taking turns. This pattern is known as competing consumers.
We can specify that we want to create a separate Subscription via fluent configuration passed to the Subscribe method. For instance in this example, each message will be received twice, once per subscription:
bus.Subscribe<MyMessage>(msg => Console.WriteLine("One: " + msg.Text),
x => x.WithSubscription("One"));
bus.Subscribe<MyMessage>(msg => Console.WriteLine("Two: " + msg.Text),
x => x.WithSubscription("Two"));
AzureNetQ will create a unique Subscription within a Topic on the Service Bus broker for each unique subscription name. Unless you want to handle the same message more than once - for instance when dealing with orthogonal concerns - you don't need to specify a subscription.
Please note: only one handler can be attached to each Subscription per instance of IBus.
SubscribeAsync allows your subscriber delegate to return a Task immediately and then asynchronously execute long-running IO operations. Once the long-running subscription is complete, simply complete the Task. In the example below we are making a request to a web service using an asynchronous IO operation (DownloadStringTask). When the task completes, we write a line to the console.
bus.SubscribeAsync<MyMessage>(message =>
new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
.ContinueWith(task =>
Console.WriteLine("Received: '{0}', Downloaded: '{1}'",
message.Text,
task.Result)));
All the subscribe methods return an IDisposable. You can cancel a subscriber at any time by calling Dispose on the IDisposable instance:
var consumer = bus.Subscribe<MyMessage>(MyHandler);
...
consumer.Dispose();
This will stop AzureNetQ consuming from the Subscription.
AzureNetQ and Service Bus provide distributed processing out-of-the-box. Say we have written a windows service with a single call to subscribe just like the one above. We deploy it on a server and start it up. When the Subscribe call is run AzureNetQ creates a Topic based on the message type, something like 'namespace.myMessage:assemblyname', and within that a subscription 'Default' (or use the subscription name if provided). As instances of MyMessage are published they are routed to this Subscription and our windows service gets a copy of every message. This is exactly what we want.
Now, what if we deploy a second instance of our windows service on a second server and start it up? When the Subscribe call runs, AzureNetQ will find that there’s already a Topic / Subscription that matches the message type / subscriber combination, so instead of creating a new one it will simply start consuming from the existing Subscription created by the first instance. When we have two consumers attached in this manner, it sends messages to the each in turn. So the first message will be sent the the first instance, the second to the second instance and then the third to the first instance, and so on. We get distributed processing out-of-the-box, with no need for any special programming techniques when writing our subscribers, or special software or hardware load balancers.
- Quick Start
- Introduction
- Casing in point: Topics and topics, Subscriptions and subscriptions
- Installing AzureNetQ
- Connecting to Service Bus
- Logging
- Publish
- Subscribe
- Request Response
- Send Receive
- Topic Based Routing
- Controlling Queue names
- Polymorphic Publish and Subscribe
- Scheduling Events with Future Publish
- Auto Subscriber
- Non Generic Publish & Subscription Extension Methods
- Replacing AzureNetQ Components
- Using a DI Container