One of the first questions developers ask once they learn how Akka.NET actors work is…
"If actors can only process one message at a time, can I still use
asyncmethods orTask<T>objects inside my actors?"
Yes! You can still use asynchronous methods and Task<T> objects inside your actors - using the PipeTo pattern (instead of using await)!
This lesson will show you how.
"But wait!", you say. "Aren't actors already asynchronous?"
Indeed they are, and you make an astute point! Due to the nature of passing immutable messages between actors, actors are inherently thread-safe and asynchronous (they don't block each other).
But what if you want to do some asynchronous work from within an actor itself, such as kick off a long-running HTTP request via a Task?
Most developers would default to using await, which has achieved demigod status since its release in 2012.
And they would be making the wrong choice.
Why? To answer that, we need to review how actors process messages.
Actors process the contents of their mailbox one message at a time. It looks like this:
NOTE: If you're following along using the eBook / .ePub, you won't see the animation. Click here to see it.
Why is maintaining this behavior critical?
Recall that immutable messages themselves are inherently thread-safe, since a different thread can't modify something that is immutable.
BUT: while the messages are inherently thread-safe, the message-processing code has no such guarantee!
Processing one message at a time is critical because making sure an actor's message processing code (OnReceive) can only be run one invocation at a time is how Akka.NET enforces thread-safety for all of the code that executes inside an actor.
An immutable message is pushed from the mailbox into OnReceive. Once the call to OnReceive exits, the actor's mailbox pushes a new message into the actor's OnReceive method.
That being said, it's still possible to take advantage of async methods and methods that return Task<T> objects inside the OnReceive method - you just have to use the PipeTo extension method!
The PipeTo pattern is a simple extension method built into Akka.NET that you can append to any Task<T> object.
public static Task PipeTo<T>(this Task<T> taskToPipe,
ICanTell recipient, IActorRef sender = null)The goal behind PipeTo is to treat every async operation just like any other method that can produce a message for an actor's mailbox.
THAT is the right way to think about actors and concurrent Task<T>s in Akka.NET. A Task<T> is not something you await on in Akka.NET. It's just something else that produces a message for an actor to process through its mailbox.
The PipeTo method takes an ICanTell object as a required argument, which tells the method where to pipe the results of an asynchronous Task<T>.
Here are all of the Akka.NET classes that you can use with ICanTell:
IActorRef- a reference to an actor instance.ActorSelection- a selection of actors at a specified address. This is what gets returned whenever you look up an actor based on its path.
Most of the time, you're going to want to have your actors pipe the results of a task back to themselves. Here's an example of a real-world use case for PipeTo, drawn from our official Akka.NET PipeTo code sample.
// time to kick off the feed parsing process, and send the results to this same actor
Receive<BeginProcessFeed>(feed =>
{
SendMessage(string
.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(Self);
});View the full source for this example..
Whenever you kick off a Task<T> and use PipeTo to deliver the results to some IActorRef or ActorSelection, here's how your actor is really processing its mailbox.
NOTE: If you're following along using the eBook / .ePub, you won't see the animation. Click here to see it.
In this case we're using PipeTo to send the results back to itself, but you can just as easily send these results to different actor.
The important thing to notice in this animation is that the actor continues processing other messages while the asynchronous operation is happening.
That's why PipeTo is great for allowing your actors to parallelize long-running tasks, like HTTP requests.
Have some post-processing you need to do on a Task<T> before the result gets piped into an actor's mailbox? No problem - you can still use ContinueWith and all of the other TPL design patterns you used in procedural C# programming.
Here's another example from our PipeTo code sample:
// asynchronously download the image and pipe the results to ourself
_httpClient.GetAsync(imageUrl).ContinueWith(httpRequest =>
{
var response = httpRequest.Result;
// successful img download
if (response.StatusCode == HttpStatusCode.OK)
{
var contentStream = response.Content.ReadAsStreamAsync();
try
{
contentStream.Wait(TimeSpan.FromSeconds(1));
return new ImageDownloadResult(image,
response.StatusCode, contentStream.Result);
}
catch //timeout exceptions!
{
return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
}
}
return new ImageDownloadResult(image, response.StatusCode);
},
TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);View the full source for this example..
So in this case, we're downloading an image via a HttpClient inside an Akka.NET actor, and we want to check the status code of the HTTP response before we use PipeTo to deliver a message back to this actor.
So we do the HTTP code handling inside a ContinueWith block and use that to return an ImageDownloadResult message that will be piped to the actor using the PipeTo block. Pretty easy!
Per the Akka.NET v1.0 release notes, native support for async and await is now available inside ReceiveActors.
public class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAsync<SomeMessage>(async some => {
//we can now safely use await inside this receive handler
await SomeAsyncIO(some.Data);
Sender.Tell(new EverythingIsAllOK());
});
}
}There's some magic under the hood that takes care of this.
However, the PipeTo pattern is still the preferred way to perform async operations inside an actor, as it is more explicit and clearly states what is going on and await suspends the actors' mailbox between continuations in order to honor the actor's "one message at a time" guarantee.
Yes, you need to close over any state whose value might change between messages that you need to use inside your ContinueWith or PipeTo calls.
This usually means closing over the Sender property and any private state you've defined that is likely to change between messages.
For instance, the Sender property of your actor will definitely change between messages. You'll need to use a C# closure for this property in order to guarantee that any asynchronous methods that depend on this property get the right value.
Doing a closure is as simple as stuffing the property into an instance variable (var) and using that instance variable in your PipeTo call, instead of the field or property defined on your actor.
Here's an example of closing over the Sender property:
Receive<BeginProcessFeed>(feed =>
{
// instance variable for closure
// close over the current value of Sender, since it changes between
// messages and accessing by property later would give different value
var senderClosure = Sender;
SendMessage(string
.Format("Downloading {0} for RSS/ATOM processing...", feed.FeedUri));
// send result of this async task back to the sender of the current message
_feedFactory.CreateFeedAsync(feed.FeedUri).PipeTo(senderClosure);
});NOTE: Assuming you're piping the result of the
Taskback to the same actor, you don't need to close overSelforParent. ThoseIActorRefs will be the same when theTaskreturns. You just need to close over the state that is going to change by the time theTaskcompletes and executes its continuation delegate.
Now, let's get to work and use this powerful parallelism technique inside our actors!
Currently our GithubWorkerActor instances all block when they're waiting for responses back from the GitHub API, using the following code:
var getStarrer = _gitHubClient.Activity.Starring.GetAllForUser(starrer);
// ewww
getStarrer.Wait();
var starredRepos = getStarrer.Result;
Sender.Tell(new StarredReposForUser(starrer, starredRepos));We're going to leverage the full power of the TPL and allow each of our GithubWorkerActor instances kick off multiple parallel Octokit queries at once, and then use PipeTo to asynchronously deliver the completed results back to our GithubCoordinatorActor.
Take note - this the current speed of our GitHub scraper at the end of lesson 2:
NOTE: If you're following along using the eBook / .ePub, you won't see the animation. Click here to see it.
Open up Actors/GithubWorkerActor.csand replace the InitialReceives method with the following code:
private void InitialReceives()
{
// query an individual starrer
Receive<RetryableQuery>(query => query.Query is QueryStarrer, query =>
{
// ReSharper disable once PossibleNullReferenceException
// (we know from the previous IS statement that this is not null)
var starrer = (query.Query as QueryStarrer).Login;
// close over the Sender in an instance variable
var sender = Sender;
_gitHubClient.Activity.Starring.GetAllForUser(starrer)
.ContinueWith<object>(tr =>
{
// query faulted
if (tr.IsFaulted || tr.IsCanceled)
return query.NextTry();
// query succeeded
return new StarredReposForUser(starrer, tr.Result);
}).PipeTo(sender);
});
// query all starrers for a repository
Receive<RetryableQuery>(query => query.Query is QueryStarrers, query =>
{
// ReSharper disable once PossibleNullReferenceException
// (we know from the previous IS statement that this is not null)
var starrers = (query.Query as QueryStarrers).Key;
// close over the Sender in an instance variable
var sender = Sender;
_gitHubClient.Activity.Starring.GetAllStargazers(starrers.Owner, starrers.Repo)
.ContinueWith<object>(tr =>
{
// query faulted
if (tr.IsFaulted || tr.IsCanceled)
return query.NextTry();
return tr.Result.ToArray();
}).PipeTo(sender);
});
}That's it!
Build and run GithubActors.sln - the performance should be really fast now.
NOTE: If you're following along using the eBook / .ePub, you won't see the animation. Click here to see it.
At the start of the lesson, it took us 4 seconds to download our first 4 users for https://github.com/petabridge/akka-bootcamp. At the end of the lesson we downloaded 22 users in 4 seconds. All of this without adding any new actors or doing anything other than just letting the TPL work in concert via PipeTo.
NOTE: The GitHub API appears to be really slow for a handful of users on every repository we've tested. We have no idea why.
Awesome - now you can use Task<T> instances in combination with your actors for maximum concurrency! Hooray!
Now it's time to move onto the final lesson: Lesson 5 - How to prevent deadlocks with ReceiveTimeout.
See our full Akka.NET PipeTo sample.
Come ask any questions you have, big or small, in this ongoing Bootcamp chat with the Petabridge & Akka.NET teams.
If there is a problem with the code running, or something else that needs to be fixed in this lesson, please create an issue and we'll get right on it. This will benefit everyone going through Bootcamp.



