diff --git a/distributed-process.cabal b/distributed-process.cabal index 5f3871b0..2a9cc918 100644 --- a/distributed-process.cabal +++ b/distributed-process.cabal @@ -76,6 +76,7 @@ Library Control.Distributed.Process.Internal.WeakTQueue, Control.Distributed.Process.Management, Control.Distributed.Process.Node, + Control.Distributed.Process.Node.RegistryAgent Control.Distributed.Process.Serializable, Control.Distributed.Process.UnsafePrimitives Control.Distributed.Process.Management.Internal.Agent, diff --git a/src/Control/Distributed/Process/Internal/Primitives.hs b/src/Control/Distributed/Process/Internal/Primitives.hs index 5600d3ae..00a1f2eb 100644 --- a/src/Control/Distributed/Process/Internal/Primitives.hs +++ b/src/Control/Distributed/Process/Internal/Primitives.hs @@ -72,6 +72,7 @@ module Control.Distributed.Process.Internal.Primitives , unlink , monitor , unmonitor + , unmonitorAsync , withMonitor -- * Logging , say diff --git a/src/Control/Distributed/Process/Management/Internal/Types.hs b/src/Control/Distributed/Process/Management/Internal/Types.hs index 7443aef2..49b24658 100644 --- a/src/Control/Distributed/Process/Management/Internal/Types.hs +++ b/src/Control/Distributed/Process/Management/Internal/Types.hs @@ -30,6 +30,7 @@ import Control.Distributed.Process.Internal.Types , NodeId ) import Control.Monad.IO.Class (MonadIO) +import Control.Monad.Fix (MonadFix) import qualified Control.Monad.State as ST ( MonadState , StateT @@ -123,6 +124,7 @@ newtype MxAgent s a = } deriving ( Functor , Monad , MonadIO + , MonadFix , ST.MonadState (MxAgentState s) , Typeable , Applicative diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index 5830f6fe..00128177 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -68,6 +68,7 @@ import Control.Exception ) import qualified Control.Exception as Exception (Handler(..), catches, finally) import Control.Concurrent (forkIO, forkIOWithUnmask, killThread, myThreadId) +import Control.Distributed.Process.Node.RegistryAgent (registryMonitorAgent) import Control.Distributed.Process.Internal.StrictMVar ( newMVar , withMVar @@ -306,6 +307,7 @@ startServiceProcesses node = do -- loops during tracing if the user reregisters the "logger" with a custom -- process which uses 'send' or other primitives which are traced. register "trace.logger" logger + void $ registryMonitorAgent where fork = forkProcess node diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs new file mode 100644 index 00000000..a64b1b69 --- /dev/null +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE RecursiveDo #-} +----------------------------------------------------------------------------- +---- | +---- Module : Control.Distributed.Process.Node.RegistryAgent +---- Copyright : (c) Tweag I/O 2015 +---- License : BSD3 (see the file LICENSE) +---- +---- Maintainer : Tim Watson +---- Stability : experimental +---- Portability : non-portable (requires concurrency) +---- +---- This module provides a registry monitoring agent, implemented as a +---- /distributed-process Management Agent/. Every 'node' starts this agent on +---- startup. The agent will monitor every remote process that was added to the +---- local registry, so the node removes the process from the registry when it +---- dies or when a network failure is detected. +---- +------------------------------------------------------------------------------- + +module Control.Distributed.Process.Node.RegistryAgent + ( registryMonitorAgent + ) where + +import Control.Distributed.Process.Management +import Control.Distributed.Process.Internal.Types +import Control.Distributed.Process.Internal.Primitives +import Data.Foldable (forM_) +import Data.Map (Map) +import qualified Data.Map as Map + +registryMonitorAgentId :: MxAgentId +registryMonitorAgentId = MxAgentId "service.registry.monitoring" + +-- | Registry monitor agent +-- +-- This agent listens for 'MxRegistered' and 'MxUnRegistered' events and tracks +-- all remote 'ProcessId's that are stored in the registry. +-- +-- When a remote process is registered, the agent starts monitoring it until it +-- is unregistered or the monitor notification arrives. +-- +-- The agent keeps the amount of labels associated with each registered remote +-- process. This is necessary so the process is unmonitored only when it has +-- been unregistered from all of the labels. +-- +registryMonitorAgent :: Process ProcessId +registryMonitorAgent = do + nid <- getSelfNode + -- For each process the map associates the 'MonitorRef' used to monitor it and + -- the amount of labels associated with it. + mxAgent registryMonitorAgentId (Map.empty :: Map ProcessId (MonitorRef, Int)) + [ mxSink $ \(ProcessMonitorNotification _ pid _) -> do + mxUpdateLocal (Map.delete pid) + mxReady + , mxSink $ \ev -> do + case ev of + MxRegistered pid _ + | processNodeId pid /= nid -> do + hm <- mxGetLocal + m <- liftMX $ mdo + let (v,m) = Map.insertLookupWithKey (\_ (m',r) _ -> (m',r+1)) + pid (mref,1) hm + mref <- maybe (monitor pid) (return . fst) v + return m + mxSetLocal m + MxUnRegistered pid _ + | processNodeId pid /= nid -> do + hm <- mxGetLocal + forM_ (pid `Map.lookup` hm) $ \(mref, i) -> + let i' = pred i + in if i' == 0 + then do liftMX $ unmonitorAsync mref + mxSetLocal $! pid `Map.delete` hm + else mxSetLocal $ Map.insert pid (mref,i') hm + _ -> return () + mxReady + -- remove async answers from mailbox + , mxSink $ \RegisterReply{} -> mxReady + , mxSink $ \DidUnmonitor{} -> mxReady + ]