-
Notifications
You must be signed in to change notification settings - Fork 98
Separating Channels and Actors
For the purposes of this document/page:
-
Cloud Haskell refers to the current implementation of distributed-process - (from this repository, in fact), and other libraries authored by Duncan, Edsko, and other brilliant folks from Well Typed, and developed, maintained, and curated by the awesome people at Tweag I/O (Mathieu, Alexander, and Facundo, to mention a few), and Tim Watson
-
Akka refers to the Java/Scala implementation of Akka
-
Akka Streams refers to the Akka Scala API for Reactive Streams
-
Erlang refers to the open source implementation of Erlang/OTP
-
Actor refers to the universal primitive of concurrent computation under the actor model
-
In distributed-process terms, Actor refers to code running in the
Process
monad, which has been spawned on a local or remote node, using the primitivesspawn
,spawnLocal
,spawnChan
, and so on -
In GHC-Haskell terms, an Actor in distributed-process is a
forkIO
thread which is managed by the local node infrastructure, in terms of its lifetime, and connection(s) to other actors in the system (whether distributed or otherwise) -
Node infrastructure refers to the relationship between the local node and running processes in Cloud Haskell, viz data that is shared, and communicated, between
forkIO
thread, all of which is managed by the local node.
- The need for actors to communicate with the outside world
Here when we say the outside world, we mean code that is not running in the Process
monad, and which is not connected to the Cloud Haskell node infrastructure. For example, one might define a set of actors that run (either locally, or across a cluster of nodes) in Cloud Haskell, and provide a web based (i.e. HTTP) interface for managing (or otherwise interacting with) these actors. Equally, it seems likely (if not certain) that production code might need to interact with other services which are not implemented using the actor model (or even using Haskell, for that matter).
In most implementations, actors communicate with the outside world (and each other) using asynchronous message passing, usually offering some kind of opaque handle for third parties to use when sending messages. Examples of this include the ProcessId
type from distributed-process, and the SendPort a
type from the same library (which represents the sending end of a typed channel). Some implementations offer other means of addressing actors, for example:
- The process registry in Erlang
- The process registry in Cloud Haskell
- Actor paths/URIs in Akka
Unlike Erlang, asynchronous message passing is not a language level primitive in Haskell, so we need to consider how code running outside of the Process
monad, and thus outside the context of a managed Cloud Haskell node, ought to interact with actors running within the system. One option is to use the runProcess
function, which is defined in the API for Control.Distributed.Process.Node
, which essentially does the following:
- create an empty
MVar
- spawn a new
Process
to run the supplied code (i.e.forkIO
and execute the given actor) - wait on the actor finishing (i.e. the
Process
code completing) and write()
theMVar
- have the calling thread wait on the
MVar
to determine that the actor has finished working
I've elided the details of error handling, and particularly asynchronous exception handling, and inheriting the masking state of the calling thread, and so on...
If we want to send a message to an actor in the system from outside then, we must forkIO
a new thread and use thread synchronising techniques (like MVar
) to determine that the sending completed. Since sending is asynchronous and should never fail - more in this later - we may instead choose to forkProcess
in the calling thread, since we're not waiting on a reply anyway. As long as we do not mind the following actions in the forkIO thread that spawned the new process racing with the code that sends that message, this approach is fine. What if we wish to avoid having to forkIO
for every send
though? We can obviously define an Actor as a dispatcher to handle this:
-- imports and some type decls elided for brevity
-- let's assume type InputData = () or some such ...
type InputData = ()
type ClientHandle = TQueue (String, InputData) -- usable outside `Process'
dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
where readQ = liftIO . atomically . readTQueue
mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
q <- newTQueueIO
forkProcess node (dispatcher q)
return q
someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend hClient = do
-- this might be in IO, or in some other monad (a web framework, etc)
thing <- getFantasticThingThatActorsCanProcess
atomically $ writeTQueue hClient ("destination.actor.name", thing)
Except now we've lost a very useful property of using actors in the first place... We have no idea if the actor (viz, the forkIO
thread we spawned using forkProcess
earlier) is still running and consuming the TQueue
. If we're expecting some side effect of sending the message to alter the state of the world at some point in time, this is obviously is not good. To make the issue more concrete, let's have the client handle provide an explicit reply channel (we'll ignore the fact this design ignores ordering when multiple clients write back...):
type InputData = ()
type OutputData = ()
type SendQ = TQueue (String, InputData) -- usable outside `Process'
type ReplyQ = TQueue (String, OutputData)
data ClientHandle = ClientHandle { sendQ :: SendQ, replyQ :: ReplyQ }
dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
where readQ = liftIO . atomically . readTQueue
listener :: TQueue (String, OutputData) -> Process ()
listener q = forever $ expect >>= liftIO . atomically . writeTQueue q
mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
sQ <- newTQueueIO
rQ <- newTQueueIO
forkProcess node (dispatcher sQ)
return $ ClientHandle sQ rQ
someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend ClientHandle{..} = do
-- this might be in IO, or in some other monad (a web framework, etc)
thing <- getFantasticThingThatActorsCanProcess
atomically $ writeTQueue sendQ ("destination.actor.name", thing)
someCodeThatWantsToGetTheReply :: ClientHandle -> IO OutputData
someCodeThatWantsToGetTheReply ClientHandle{..} = do
(_, reply) <- atomically $ readTQueue replyQ
return reply
- As per the above, we have a leaky abstraction...
What happens if the process you thought was registered at that name has died? What happens if it wasn't registered? If we were just passing data around using STM, at least the runtime system would have a chance to detect the deadlock, but here it's kept invisible from us, and as far as the RTS is concerned there may be no deadlock, if other threads are looking at the CQueue for the listener (for example), and so on.
Perhaps more to the point - since this discussion is not around correctness in and of itself - this places a lot of the burden for dealing with concurrency, cross-thread synchronised access to data, and all the things that go with that (e.g. exception safety, which I've blithely ignored in these examples) back on the client.
Cloud Haskell makes it possible to avoid these kinds of deadlocks by providing monitoring and supervision primitives, however they're not usable outside of the context of the Cloud Haskell actor system. Therefore it is awkward, if not impossible, to integrate Cloud Haskell code cleanly with code that runs outside of the process monad in a safe manner, unless the actor system is central to the design of the running application, and the code which runs outside of the process monad is a second class citizen to that which is managed by the CH node.
I believe this is a major blocker for many people using Cloud Haskell in their projects, since in the example where we have a web server that wishes to communicate with the actor system, it is very difficult to get that right, and especially problematic when there are different thread management (and possibly exception management) policies applied by the web application framework on which the bulk of the application code is presumably being written.
Erlang solves this problem by rote - virtually everything is a process, even port drivers running code using the FFI have to emulate the behaviour of an actor in the system (though NIFs are an exception to this, sort of). It is neither pragmatic, nor desirable for Cloud Haskell to take this approach. In Erlang several web frameworks have emerged, most of which deal with the networking (non-blocking socket I/O) by hand. But whilst Akka has demonstrated some great results using actors to process web requests in the Play Framework, I don't think Cloud Haskell has any appetite to compete with the likes of warp. Fundamentally, Cloud Haskell code needs to operate seamlessly with code written to run outside of the framework.
- Recognising that the actors model does not solve every problem
There are plenty of reasons why code which is not structured using the actor model might want to send Haskell data types across the wire, or even Haskell closures (or static pointers) over the network. Not only do we need to recognise that any actor implementation needs to play nicely with the world outside of it's managed infrastructure black box, we must also allow for the desire for distribution to occur outside of the actor model at all.
Many fantastic and complex applications use network-transport without incorporating distributed-process and it's Erlang based semantics. I suspect that higher level abstractions such as typed channels and monitoring would be useful in these contexts, without forcing the actor model as a requirement for having these things.
- Actors don't compose well
I don't think this really needs much explanation. Typed channels should compose perfectly well. I suspect we can build a very strong and concrete API around this.
There are other channel based concepts to consider too, such as integration patterns such as splitting, re-sequencing, aggregating, dead wire tapping, and so on.
- Exercise: Sketch out what channels might look like as independent from actors
We will start with local only channels and then consider remote channels. We will also start out considering unidirectional channels only. Bi-directional channels (i.e. request-reply channels) can be layered on top of the basic primitives, I would've thought.
Let's assume we have a fan-out exchange, and want to disperse data using it. There are a number of options for doing this currently. One of them is to use distributed-process-execution, and for a fan-out specifically, to use a broadcast exchange.
fanOut :: [(Int -> Int)] -> Process ()
fanOut xfms = do
us <- getSelfPid
ex <- broadcastExchange
pids <- forM_ xfms $ \f -> spawnLocal $ do self <- getSelfPid
expect >>= \i -> send us (self, f i)
replicateM_ 10 (post ex)
collectFrom pids
This model doesn't compose well or feel very appealing. The middle-man needs setting up explicitly before adding the end of the chain, and then starting the producers? Yuk...
On reflection though, I don't want to try and make this too compositional, or delve into routing to deeply. Doing so would effectively move us in the direction of a (reactive) streams API, which isn't the focus here. Instead, let's look at some pseudocode options for working with actors and channels. First let's consider channels in isolation.
-- Q: do we want to ensure channels are used in (MonadIO m) => Monad m -> ...?
chewingOnChannels = do
-- in some monad or other then..
forM_ ([1..50] :: [Int]) $
source yield <$> pure ((*) 2 :: Int -> Int) <*> (sink $ liftIO . putStrLn)
Obviously we could make that work without any concurrency at all. Let's assume our source is a channel that is being written to by some other thread, and our sink behaves like a FIFO queue - something we could implement on the writing end of the sink with an STM TBQueue
, for example.
This allows our reader to apply back-pressure to the sender rather easily. An even more direct form of back-pressure might involve using a TMVar
or some such.
I believe that different channel semantics are required for different situations. Our high level API should define some primitives that channels can behave in various different ways, and are amenable to different implementation strategies.
producer chan = forever $ writeChannel chan 42
consumer chan = forever $ readChannel chan >>= print
main = do
-- this API is speculative, ofc. The point is to choose at channel creation
-- time, the semantics we require of the channel. I have used the terminology
-- _acquire_ to indicate the fact that for channels to function properly, we
-- might need to set up some background infrastructure...
chan <- acquireChannel ChannelType_X
void $ forkIO $ producer chan -- starts flooding the channel immediately
tid <- forkIO $ consumer chan -- reads from the channel continuously
-- assuming our channel is either size bounded or 1-for-1 (TMVar, etc..)
-- when we kill the consumer, the producer should block on writeChannel
killThread tid -- speculative API, probably looks like throwTo
-- we can allow the producer to progress a bit now
forM_ [1..100] $ readChannel chan
-- and now we can close the channel
closeChannel chan
Regardless of whether or not a channel utilises back-pressure, we need signalling on its control plane.
API design question: does closing the channel terminate the producer, or does it provide a status datum as the result of writeChannel? Perhaps coupling the two APIs we make the most sense.
demo chan = do
-- so writeChannel chan might throw ChannelClosedException
-- API design question: do we wrap other synchronous/asynchronous exceptions
-- in ChannelClosedException and re-throw?
void $ forkIO $ forever $ void $
try (writeChannel chan "hello again") :: IO (Either ChannelClosedException ())
-- and an alternative that tells you if it succeeded or not...
didWrite <- writeChannel chan "hi there" :: IO ChannelStatus
case didWrite of
ChannelOk -> return ()
ChannelClosed -> putStrLn "hmm, that didn't work..."
ChannelError e -> {- some exception -} print e
-- lots of design questions about the above...
-- now let's blow up that forkIO thread from above O_o
closeChannel chan
I'm not sure there's any easy to way to track providers and consumers of a channel automatically outside of an actor based implementation. Aside from introducing indeterminate GHC magic, I think the best approach here would be to provide an explicit API for doing this kind of thing. That might also be a useful place to introduce monadic execution contexts, which would give clients of the API the ability to interact with internal stuff in a frictionless manner. This is going on the assumption that our channel abstractions can be made to fit with Applicative
and Alternative
, which I would very much like. (Once we start looking at bi-directional channels, do we then get into BiFunctor
and Contravariant
land, and end up using Profunctor
s?)
demo1 chan = do
-- we register ourselves and get on with it (note the implicit binding to `chan`)
registerProducer chan $ forever $ writeChannel "Well hello there!"
demo2 chan = do
-- we register ourselves again, and get on with it (not the implicit binding again)
registerConsumer chan $ forever $ readChannel >>= putStrLn
demo3 chan = do
forkIO $ demo1 chan
forkIO $ demo2 chan
channelSubscriptions chan >>= print
channelProducers chan >>= print
channelConsumer chan >>= print
channelStats >>= print
This is all still quite a low level API, and that's really the point. If we get the layers defined cleanly, with clear semantics - at the API level, so implementors are responsible for meeting the demand - then we should be able to build richer capabilities on top of these.
Abstractions should not depend on details, but rather, details should depend on abstractions. Or so the saying goes. I think figuring out where the interface/api segregation needs to occur when we combine these channel based APIs with remoting, and then with actors, will be tricky. Not to mention that we want to ensure we do not break the formal semantics for distributed-process - but I think we need to find ways to derive this API for its own set of formal semantics.
- Exercise: Sketch out different approaches for monitoring channels
All the examples above were predicated on some implementation as yet undefined, which makes it possible to communicate across multiple threads via the channels. Speculative commentary about monitoring channels is an entirely more complex task. Let's take a few high level options and discuss without pseudocode for the time being...
- Monitoring using asynchronous exceptions
This is a fairly simple approach. For a given channel, we maintain some registry of interested clients. When certain channel events occur, we notify said clients by throwing an asynchronous exception at them.
The advantages of this approach lay in its simplicity. Adding oneself to the registry is presumably not an operation that will occur with excessive frequency, thus we can do this in a critical section, and in the caller's thread. Let us assume that we have defined the generic definition of a channel using a record type, and that the specifics of how the channel is implemented are deferred to some ChannelType
record, which supplies the thunks which we in turn evaluate in order to operate on it. At our outer level record, a TMVar
holds the set of client ThreadId
s to which we wish to throw asynchronous exceptions in the event of certain channel state changes...
-- assuming a Channel record which has a `Set ThreadId` stored in a `TMVar`
closeChannel Channel{..} = do
tids <- atomically do
ms <- takeTMVar monitors
apiClose channelType -- this assumes that implementations must close in STM
putTMVar monitors ms -- Q: is this really necessary?
return ms
forM_ tids (flip throwTo)
Loads of assumptions there - that closing a channel can take place in STM for any implementation for example. I think that can probably hold, since channels that rely on code running in IO - for example channels that are backed by remoting - will presumably have to spin off threads to handle their network interactions. Perhaps someone does want a synchronous, serialised channel where readChannel
accesses the network directly, but I don't see that as a common case, and such an implementation wouldn't be able to use network-transport directly, unless it was able to handle all the event types by itself (and also be comfortable with one channel consuming an entire TCP/IP connection all by itself!). Given this is the case, then apiClose
can use STM to signal the networking thread and if needed, wait for a reply using a TMVar
or whatever.
- Performance Considerations?
In the current implementation of distributed-process, there is already a good deal of inter-thread synchronisation using STM. Both the process mailbox, and all typed channels, have either the network listener or the node controller thread use an STM transaction to write incoming messages - in fact both do this depending on the source of the write. It looks like stm-channelize is trying to provide a generic API over the network side of things, but I'm not sure about the safety of that implementation either.
Now, an alternative implementation of closeChannel
would be to use a signal. Looking at the code in Control.Concurrent.Event, I don't think it will work for this use case, but I suspect we can cook something up. This disadvantage of this approach, of course, is that the monitoring thread won't get the signal in a timely fashion - they'll only know about the channel's demise (or whatever other event took place) when they poll the signalling mechanism. I'm not really sure how valuable that approach is in practise.
- Exercise: Since we do want distribution, what separates channels with remote endpoints from local ones?
- Exercise: What would it mean to compose local and remote channels?