diff --git a/distributed-process.cabal b/distributed-process.cabal index 3a938843..878a7e5c 100644 --- a/distributed-process.cabal +++ b/distributed-process.cabal @@ -71,6 +71,7 @@ Library Control.Distributed.Process.Internal.WeakTQueue, Control.Distributed.Process.Management, Control.Distributed.Process.Node, + Control.Distributed.Process.Node.RegistryAgent Control.Distributed.Process.Serializable, Control.Distributed.Process.UnsafePrimitives Control.Distributed.Process.Management.Internal.Agent, diff --git a/doc/semantics/CloudHaskellSemantics.tex b/doc/semantics/CloudHaskellSemantics.tex index 5a223e5b..625acc12 100644 --- a/doc/semantics/CloudHaskellSemantics.tex +++ b/doc/semantics/CloudHaskellSemantics.tex @@ -700,6 +700,23 @@ \subsection{Ordering and Typed Channels} for all messages from $P$ to $Q$, rather than using an ordered connection per typed channel plus one for direct messages. +\subsection{Registry} +The identifiers of both local and remote processes can be stored in the Registry. +The operation \texttt{registerRemoteAsync} can register processes at remote nodes. +When a message is sent to a remote node using \texttt{nsendRemote} there is no +guarantee that the process that should receive the message is located at the +node; thus it may be necessary to relay the message to a process on yet another node. + +Both operations \texttt{nsend} and \texttt{nsendRemote} discard the messages if +no process is registered with the given name, or already dead. + +Both \texttt{nsend} and \texttt{nsendRemote} guarantee ordering between messages +sent between two processes using one of those mechanism, however ordering between +messages sent by \texttt{send} and \texttt{nsend} is not preserved. + +Current implementation for monitoring processes that are stored in registry +are left unspecified, this may be changed in future. + \bibliographystyle{apalike} \bibliography{references} diff --git a/src/Control/Distributed/Process/Internal/Primitives.hs b/src/Control/Distributed/Process/Internal/Primitives.hs index e73783eb..3ea65bf3 100644 --- a/src/Control/Distributed/Process/Internal/Primitives.hs +++ b/src/Control/Distributed/Process/Internal/Primitives.hs @@ -71,6 +71,7 @@ module Control.Distributed.Process.Internal.Primitives , unlink , monitor , unmonitor + , unmonitorAsync , withMonitor -- * Logging , say @@ -121,6 +122,7 @@ import Prelude hiding (catch) #endif import Data.Binary (decode) +import Data.Foldable (traverse_) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (formatTime) import System.Locale (defaultTimeLocale) @@ -1110,8 +1112,7 @@ whereisRemoteAsync nid label = -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -nsend label msg = - sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg)) +nsend label msg = traverse_ (`send` msg) =<< whereis label -- | Named send to a process in the local registry (asynchronous). -- This function makes /no/ attempt to serialize and (in the case when the diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index 0b1dbac6..4a6a8336 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -68,6 +68,7 @@ import Control.Exception ) import qualified Control.Exception as Exception (Handler(..), catches, finally) import Control.Concurrent (forkIO, forkIOWithUnmask, myThreadId) +import Control.Distributed.Process.Node.RegistryAgent (registryMonitorAgent) import Control.Distributed.Process.Internal.StrictMVar ( newMVar , withMVar @@ -159,7 +160,8 @@ import Control.Distributed.Process.Internal.Types , createUnencodedMessage , runLocalProcess , firstNonReservedProcessId - , ImplicitReconnect(WithImplicitReconnect) + , ImplicitReconnect(NoImplicitReconnect, WithImplicitReconnect) + , messageToPayload ) import Control.Distributed.Process.Management.Internal.Agent ( mxAgentController @@ -187,6 +189,7 @@ import Control.Distributed.Process.Serializable (Serializable) import Control.Distributed.Process.Internal.Messaging ( sendBinary , sendMessage + , sendPayload , closeImplicitReconnections , impliesDeathOf ) @@ -296,6 +299,7 @@ startServiceProcesses node = do runProcess node $ register Table.mxTableCoordinator tableCoordinatorPid logger <- forkProcess node loop runProcess node $ register "logger" logger + runProcess node $ void $ registryMonitorAgent where fork = forkProcess node @@ -686,8 +690,8 @@ nodeController = do ncEffectRegister from label atnode pid force NCMsg (ProcessIdentifier from) (WhereIs label) -> ncEffectWhereIs from label - NCMsg _ (NamedSend label msg') -> - ncEffectNamedSend label msg' + NCMsg (ProcessIdentifier from) (NamedSend label msg') -> + ncEffectNamedSend from label msg' NCMsg _ (LocalSend to msg') -> ncEffectLocalSend node to msg' NCMsg _ (LocalPortSend to msg') -> @@ -904,11 +908,19 @@ ncEffectWhereIs from label = do (WhereIsReply label mPid) -- [Unified: Table 14] -ncEffectNamedSend :: String -> Message -> NC () -ncEffectNamedSend label msg = do +ncEffectNamedSend :: ProcessId -> String -> Message -> NC () +ncEffectNamedSend from label msg = do mPid <- gets (^. registeredHereFor label) -- If mPid is Nothing, we just ignore the named send (as per Table 14) - forM_ mPid $ \pid -> postMessage pid msg + node <- ask + forM_ mPid $ \pid -> + if isLocal node (ProcessIdentifier pid) + then postMessage pid msg + else liftIO $ sendPayload node + (ProcessIdentifier from) + (ProcessIdentifier pid) + NoImplicitReconnect + (messageToPayload msg) -- [Issue #DP-20] ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC () diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs new file mode 100644 index 00000000..05228223 --- /dev/null +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -0,0 +1,64 @@ +{-# LANGUAGE BangPatterns #-} +----------------------------------------------------------------------------- +---- | +---- Module : Control.Distributed.Process.Node.RegistryAgent +---- Copyright : (c) Tweag I/O 2015 +---- License : BSD3 (see the file LICENSE) +---- +---- Maintainer : Tim Watson +---- Stability : experimental +---- Portability : non-portable (requires concurrency) +---- +---- This module provides registry monitoring agent, implemented as +---- a /distributed-process Management Agent/. Once 'node' starts it run this +---- agent, such agent will monitor every remove process that is added to node +---- and remove Processes from registry if they die. +---- +------------------------------------------------------------------------------- + +module Control.Distributed.Process.Node.RegistryAgent + ( registryMonitorAgent + ) where + +import Control.Distributed.Process.Management +import Control.Distributed.Process.Internal.Types +import Control.Distributed.Process.Internal.Primitives +import Data.Foldable (forM_) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map + +registryMonitorAgentId :: MxAgentId +registryMonitorAgentId = MxAgentId "service.registry.monitoring" + +registryMonitorAgent :: Process ProcessId +registryMonitorAgent = do + mxAgent registryMonitorAgentId initState + [ mxSink $ \(ProcessMonitorNotification _ pid _) -> do + mxUpdateLocal (Map.delete pid) + mxReady + , mxSink $ \ev -> + let act = case ev of + MxRegistered pid _ -> do + hm <- mxGetLocal + case pid `Map.lookup` hm of + Nothing -> do + mon <- liftMX $ monitor pid + mxUpdateLocal (Map.insert pid (mon, 1)) + Just _ -> return () + MxUnRegistered pid _ -> do + hm <- mxGetLocal + forM_ (pid `Map.lookup` hm) $ \(mref, i) -> + let !i' = succ i + in if i' == 0 + then do liftMX $ unmonitorAsync mref + mxSetLocal $! pid `Map.delete` hm + else mxSetLocal $ Map.insert pid (mref,i') hm + _ -> return () + in act >> mxReady + -- remove async answers from mailbox + , mxSink $ \RegisterReply{} -> mxReady + , mxSink $ \DidUnmonitor{} -> mxReady + ] + where + initState :: Map ProcessId (MonitorRef,Int) + initState = Map.empty