@@ -91,7 +91,7 @@ import qualified Data.X509 as X
9191import qualified Data.X509.Validation as XV
9292import GHC.Conc.Signal
9393import GHC.IORef (atomicSwapIORef )
94- import GHC.Stats (getRTSStats )
94+ import GHC.Stats (RTSStats ( .. ), GCDetails ( .. ), getRTSStats )
9595import GHC.TypeLits (KnownNat )
9696import Network.Socket (ServiceName , Socket , socketToHandle )
9797import qualified Network.TLS as TLS
@@ -198,6 +198,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
198198 <> serverStatsThread_ cfg
199199 <> prometheusMetricsThread_ cfg
200200 <> controlPortThread_ cfg
201+ <> [memoryDiagThread]
201202 )
202203 `finally` stopServer s
203204 where
@@ -719,6 +720,60 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
719720 Nothing -> acc
720721 Just (_, ts) -> (cnt + 1 , updateTimeBuckets ts ts' times)
721722
723+ memoryDiagThread :: M s ()
724+ memoryDiagThread = do
725+ labelMyThread " memoryDiag"
726+ Env
727+ { ntfStore = NtfStore ntfMap,
728+ server = srv@ Server {subscribers, ntfSubscribers},
729+ proxyAgent = ProxyAgent {smpAgent = pa},
730+ msgStore_ = ms
731+ } <- ask
732+ let SMPClientAgent {smpClients, smpSessions, activeServiceSubs, activeQueueSubs, pendingServiceSubs, pendingQueueSubs, smpSubWorkers} = pa
733+ liftIO $ forever $ do
734+ threadDelay 300_000_000 -- 5 minutes
735+ rts <- getRTSStats
736+ let GCDetails {gcdetails_live_bytes, gcdetails_mem_in_use_bytes} = gc rts
737+ clientCount <- IM. size <$> getServerClients srv
738+ smpQSubs <- M. size <$> getSubscribedClients (queueSubscribers subscribers)
739+ smpSSubs <- M. size <$> getSubscribedClients (serviceSubscribers subscribers)
740+ ntfQSubs <- M. size <$> getSubscribedClients (queueSubscribers ntfSubscribers)
741+ ntfSSubs <- M. size <$> getSubscribedClients (serviceSubscribers ntfSubscribers)
742+ smpPending <- IM. size <$> readTVarIO (pendingEvents subscribers)
743+ ntfPending <- IM. size <$> readTVarIO (pendingEvents ntfSubscribers)
744+ ntfStoreSize <- M. size <$> readTVarIO ntfMap
745+ paClients' <- M. size <$> readTVarIO smpClients
746+ paSessions' <- M. size <$> readTVarIO smpSessions
747+ paActSvc <- M. size <$> readTVarIO activeServiceSubs
748+ paActQ <- M. size <$> readTVarIO activeQueueSubs
749+ paPndSvc <- M. size <$> readTVarIO pendingServiceSubs
750+ paPndQ <- M. size <$> readTVarIO pendingQueueSubs
751+ paWorkers <- M. size <$> readTVarIO smpSubWorkers
752+ lc <- loadedQueueCounts $ fromMsgStore ms
753+ logInfo $
754+ " MEMORY"
755+ <> " rts_live=" <> tshow gcdetails_live_bytes
756+ <> " rts_heap=" <> tshow gcdetails_mem_in_use_bytes
757+ <> " rts_gc=" <> tshow (gcs rts)
758+ <> " clients=" <> tshow clientCount
759+ <> " smpQSubs=" <> tshow smpQSubs
760+ <> " smpSSubs=" <> tshow smpSSubs
761+ <> " ntfQSubs=" <> tshow ntfQSubs
762+ <> " ntfSSubs=" <> tshow ntfSSubs
763+ <> " smpPending=" <> tshow smpPending
764+ <> " ntfPending=" <> tshow ntfPending
765+ <> " ntfStore=" <> tshow ntfStoreSize
766+ <> " paClients=" <> tshow paClients'
767+ <> " paSessions=" <> tshow paSessions'
768+ <> " paActSvc=" <> tshow paActSvc
769+ <> " paActQ=" <> tshow paActQ
770+ <> " paPndSvc=" <> tshow paPndSvc
771+ <> " paPndQ=" <> tshow paPndQ
772+ <> " paWorkers=" <> tshow paWorkers
773+ <> " loadedQ=" <> tshow (loadedQueueCount lc)
774+ <> " loadedNtf=" <> tshow (loadedNotifierCount lc)
775+ <> " ntfLocks=" <> tshow (notifierLockCount lc)
776+
722777 runClient :: Transport c => X. CertificateChain -> C. APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
723778 runClient srvCert srvSignKey tp h = do
724779 ms <- asks msgStore
0 commit comments