Skip to content

Separating Channels and Actors

Tim Watson edited this page Dec 1, 2018 · 13 revisions

Terminology and clarifications

For the purposes of this document/page:

  • Cloud Haskell refers to the current implementation of distributed-process - (from this repository, in fact), and other libraries authored by Duncan, Edsko, and other brilliant folks from Well Typed, and developed, maintained, and curated by the awesome people at Tweag I/O (Mathieu, Alexander, and Facundo, to mention a few), and Tim Watson

  • Akka refers to the Java/Scala implementation of Akka

  • Akka Streams refers to the Akka Scala API for Reactive Streams

  • Erlang refers to the open source implementation of Erlang/OTP

  • Actor refers to the universal primitive of concurrent computation under the actor model

  • In distributed-process terms, Actor refers to code running in the Process monad, which has been spawned on a local or remote node, using the primitives spawn, spawnLocal, spawnChan, and so on

  • In GHC-Haskell terms, an Actor in distributed-process is a forkIO thread which is managed by the local node infrastructure, in terms of its lifetime, and connection(s) to other actors in the system (whether distributed or otherwise)

  • Node infrastructure refers to the relationship between the local node and running processes in Cloud Haskell, viz data that is shared, and communicated, between forkIO thread, all of which is managed by the local node.

Motivation

  1. The need for actors to communicate with the outside world

Here when we say the outside world, we mean code that is not running in the Process monad, and which is not connected to the Cloud Haskell node infrastructure. For example, one might define a set of actors that run (either locally, or across a cluster of nodes) in Cloud Haskell, and provide a web based (i.e. HTTP) interface for managing (or otherwise interacting with) these actors. Equally, it seems likely (if not certain) that production code might need to interact with other services which are not implemented using the actor model (or even using Haskell, for that matter).

In most implementations, actors communicate with the outside world (and each other) using asynchronous message passing, usually offering some kind of opaque handle for third parties to use when sending messages. Examples of this include the ProcessId type from distributed-process, and the SendPort a type from the same library (which represents the sending end of a typed channel). Some implementations offer other means of addressing actors, for example:

  • The process registry in Erlang
  • The process registry in Cloud Haskell
  • Actor paths/URIs in Akka

Unlike Erlang, asynchronous message passing is not a language level primitive in Haskell, so we need to consider how code running outside of the Process monad, and thus outside the context of a managed Cloud Haskell node, ought to interact with actors running within the system. One option is to use the runProcess function, which is defined in the API for Control.Distributed.Process.Node, which essentially does the following:

  • create an empty MVar
  • spawn a new Process to run the supplied code (i.e. forkIO and execute the given actor)
  • wait on the actor finishing (i.e. the Process code completing) and write () the MVar
  • have the calling thread wait on the MVar to determine that the actor has finished working

I've elided the details of error handling, and particularly asynchronous exception handling, and inheriting the masking state of the calling thread, and so on...

If we want to send a message to an actor in the system from outside then, we must forkIO a new thread and use thread synchronising techniques (like MVar) to determine that the sending completed. Since sending is asynchronous and should never fail - more in this later - we may instead choose to forkProcess in the calling thread, since we're not waiting on a reply anyway. As long as we do not mind the following actions in the forkIO thread that spawned the new process racing with the code that sends that message, this approach is fine. What if we wish to avoid having to forkIO for every send though? We can obviously define an Actor as a dispatcher to handle this:

-- imports and some type decls elided for brevity

-- let's assume type InputData = () or some such ...
type InputData = ()
type ClientHandle = TQueue (String, InputData) -- usable outside `Process'

dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
  where readQ = liftIO . atomically . readTQueue

mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
  q <- newTQueueIO
  forkProcess node (dispatcher q)
  return q

someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend hClient = do
  -- this might be in IO, or in some other monad (a web framework, etc)
  thing <- getFantasticThingThatActorsCanProcess
  atomically $ writeTQueue hClient ("destination.actor.name", thing)

Except now we've lost a very useful property of using actors in the first place... We have no idea if the actor (viz, the forkIO thread we spawned using forkProcess earlier) is still running and consuming the TQueue. If we're expecting some side effect of sending the message to alter the state of the world at some point in time, this is obviously is not good. To make the issue more concrete, let's have the client handle provide an explicit reply channel (we'll ignore the fact this design ignores ordering when multiple clients write back...):

type InputData = ()
type OutputData = ()
type SendQ = TQueue (String, InputData) -- usable outside `Process'
type ReplyQ = TQueue (String, OutputData)
data ClientHandle = ClientHandle { sendQ :: SendQ, replyQ :: ReplyQ }

dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
  where readQ = liftIO . atomically . readTQueue

listener :: TQueue (String, OutputData) -> Process ()
listener q = forever $ expect >>= liftIO . atomically . writeTQueue q

mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
  sQ  <- newTQueueIO
  rQ <- newTQueueIO
  forkProcess node (dispatcher sQ)
  return $ ClientHandle sQ rQ

someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend ClientHandle{..} = do
  -- this might be in IO, or in some other monad (a web framework, etc)
  thing <- getFantasticThingThatActorsCanProcess
  atomically $ writeTQueue sendQ ("destination.actor.name", thing)

someCodeThatWantsToGetTheReply :: ClientHandle -> IO OutputData
someCodeThatWantsToGetTheReply ClientHandle{..} = do
  (_, reply) <- atomically $ readTQueue replyQ
  return reply
  1. As per the above, we have a leaky abstraction...

What happens if the process you thought was registered at that name has died? What happens if it wasn't registered? If we were just passing data around using STM, at least the runtime system would have a chance to detect the deadlock, but here it's kept invisible from us, and as far as the RTS is concerned there may be no deadlock, if other threads are looking at the CQueue for the listener (for example), and so on.

Perhaps more to the point - since this discussion is not around correctness in and of itself - this places a lot of the burden for dealing with concurrency, cross-thread synchronised access to data, and all the things that go with that (e.g. exception safety, which I've blithely ignored in these examples) back on the client.

Cloud Haskell makes it possible to avoid these kinds of deadlocks by providing monitoring and supervision primitives, however they're not usable outside of the context of the Cloud Haskell actor system. Therefore it is awkward, if not impossible, to integrate Cloud Haskell code cleanly with code that runs outside of the process monad in a safe manner, unless the actor system is central to the design of the running application, and the code which runs outside of the process monad is a second class citizen to that which is managed by the CH node.

I believe this is a major blocker for many people using Cloud Haskell in their projects, since in the example where we have a web server that wishes to communicate with the actor system, it is very difficult to get that right, and especially problematic when there are different thread management (and possibly exception management) policies applied by the web application framework on which the bulk of the application code is presumably being written.

Erlang solves this problem by rote - virtually everything is a process, even port drivers running code using the FFI have to emulate the behaviour of an actor in the system (though NIFs are an exception to this, sort of). It is neither pragmatic, nor desirable for Cloud Haskell to take this approach. In Erlang several web frameworks have emerged, most of which deal with the networking (non-blocking socket I/O) by hand. But whilst Akka has demonstrated some great results using actors to process web requests in the Play Framework, I don't think Cloud Haskell has any appetite to compete with the likes of warp. Fundamentally, Cloud Haskell code needs to operate seamlessly with code written to run outside of the framework.

  1. Recognising that the actors model does not solve every problem

There are plenty of reasons why code which is not structured using the actor model might want to send Haskell data types across the wire, or even Haskell closures (or static pointers) over the network. Not only do we need to recognise that any actor implementation needs to play nicely with the world outside of it's managed infrastructure black box, we must also allow for the desire for distribution to occur outside of the actor model at all.

Many fantastic and complex applications use network-transport without incorporating distributed-process and it's Erlang based semantics. I suspect that higher level abstractions such as typed channels and monitoring would be useful in these contexts, without forcing the actor model as a requirement for having these things.

  1. Actors don't compose well

I don't think this really needs much explanation. Typed channels should compose well, since conceptually they are very simple. I suspect we can build a very strong and concrete API around these.

There are other channel based concepts to consider too, such as integration patterns such as splitting, re-sequencing, aggregating, dead wire tapping, and so on.

Segregating Local & Remote APIs

Location transparency is a double edged sword. On the one hand, we wish to not have to deal with the intricacies of remote calls - how do we serialise data, how do we connect to the network, how do we deal with network failures, etc etc. On the other hand, if we don't know that an interaction might travel over the network, get delayed due to congestion, find the recipient is no longer running (or in a state where it is able to handle our inputs), and so on - how can we build reliability into our systems? Erlang offers to deal with all these issues for us, and when it works it is great to not have to worry about all the difficult details. And when it doesn't work, it's difficult to understand what went wrong, since we're used to the platform, the framework, the language, all taking care of things for us.

In Cloud Haskell, real location transparency means observing constraints for the remote case even in local cases. Erlang has no concept of data that cannot be serialised (actually that's not strictly true, but even passing handles that would be of no use on a remote node will not actually fail - the data will travel over the wire, but not be usable, or will lead to a crash on one side or the other). In Cloud Haskell, this also means taking a huge performance hit for local message passing as well. In an early version of distributed-process we created an API that allowed you to send locally and not serialise the message unnecessarily, which improved performance massively.

  • Exercise: Since we do want distribution, what separates channels with remote endpoints from local ones?
  • Exercise: What would it mean to compose local and remote channels?
Clone this wiki locally