@@ -684,24 +684,29 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
684684 let threadsCount = 0
685685#endif
686686 clientsCount <- IM. size <$> getServerClients srv
687- deliveredSubs <- getDeliveredMetrics
687+ ( deliveredSubs, deliveredTimes) <- getDeliveredMetrics =<< getSystemSeconds
688688 smpSubs <- getSubscribersMetrics subscribers
689689 ntfSubs <- getSubscribersMetrics ntfSubscribers
690690 loadedCounts <- loadedQueueCounts $ fromMsgStore ms
691- pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts}
691+ pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
692692 where
693693 getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
694694 subsCount <- M. size <$> getSubscribedClients queueSubscribers
695695 subClientsCount <- IS. size <$> readTVarIO subClients
696696 subServicesCount <- M. size <$> getSubscribedClients serviceSubscribers
697697 pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
698- getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0 ) =<< getServerClients srv
699- countClnt metrics Client {subscriptions} = do
700- cnt <- foldM countSubs 0 =<< readTVarIO subscriptions
701- pure $ if cnt > 0
702- then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1 }
703- else metrics
704- countSubs ! cnt Sub {delivered} = (\ empty -> if empty then cnt else cnt + 1 ) <$> atomically (isEmptyTMVar delivered)
698+ getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0 , emptyTimeBuckets) =<< getServerClients srv
699+ where
700+ countClnt acc@ (metrics, times) Client {subscriptions} = do
701+ (cnt, times') <- foldM countSubs (0 , times) =<< readTVarIO subscriptions
702+ pure $ if cnt > 0
703+ then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1 }, times')
704+ else acc
705+ countSubs acc@ (! cnt, times) Sub {delivered} = do
706+ delivered_ <- atomically $ tryReadTMVar delivered
707+ pure $ case delivered_ of
708+ Nothing -> acc
709+ Just (_, ts) -> (cnt + 1 , updateTimeBuckets ts ts' times)
705710
706711 runClient :: Transport c => X. CertificateChain -> C. APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
707712 runClient srvCert srvSignKey tp h = do
@@ -1633,15 +1638,16 @@ client
16331638 -- This is tracked as "subscription" in the client to prevent these
16341639 -- clients from being able to subscribe.
16351640 pure s
1636- getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg )
1641+ getMessage_ :: Sub -> Maybe ( MsgId , RoundedSystemTime ) -> M s (Transmission BrokerMsg )
16371642 getMessage_ s delivered_ = do
16381643 stats <- asks serverStats
16391644 fmap (either err id ) $ liftIO $ runExceptT $
16401645 tryPeekMsg ms q >>= \ case
16411646 Just msg -> do
16421647 let encMsg = encryptMsg qr msg
16431648 incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats
1644- atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg)
1649+ ts <- liftIO getSystemSeconds
1650+ atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg)
16451651 Nothing -> incStat (msgGetNoMsg stats) $> ok
16461652
16471653 withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg )) -> M s (Transmission BrokerMsg )
@@ -1734,28 +1740,28 @@ client
17341740 Nothing -> pure $ err NO_MSG
17351741 Just sub ->
17361742 atomically (getDelivered sub) >>= \ case
1737- Just st -> do
1743+ Just (st, ts) -> do
17381744 stats <- asks serverStats
17391745 fmap (either err id ) $ liftIO $ runExceptT $ do
17401746 case st of
17411747 ProhibitSub -> do
17421748 deletedMsg_ <- tryDelMsg ms q msgId
1743- liftIO $ mapM_ (updateStats stats True ) deletedMsg_
1749+ liftIO $ mapM_ (updateStats stats True ts ) deletedMsg_
17441750 pure ok
17451751 _ -> do
17461752 (deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId
1747- liftIO $ mapM_ (updateStats stats False ) deletedMsg_
1753+ liftIO $ mapM_ (updateStats stats False ts ) deletedMsg_
17481754 liftIO $ deliverMessage " ACK" qr entId sub msg_
17491755 _ -> pure $ err NO_MSG
17501756 where
1751- getDelivered :: Sub -> STM (Maybe ServerSub )
1757+ getDelivered :: Sub -> STM (Maybe ( ServerSub , RoundedSystemTime ) )
17521758 getDelivered Sub {delivered, subThread} = do
1753- tryTakeTMVar delivered $>>= \ msgId' ->
1759+ tryTakeTMVar delivered $>>= \ v @ ( msgId', ts) ->
17541760 if msgId == msgId' || B. null msgId
1755- then pure $ Just subThread
1756- else putTMVar delivered msgId' $> Nothing
1757- updateStats :: ServerStats -> Bool -> Message -> IO ()
1758- updateStats stats isGet = \ case
1761+ then pure $ Just ( subThread, ts)
1762+ else putTMVar delivered v $> Nothing
1763+ updateStats :: ServerStats -> Bool -> RoundedSystemTime -> Message -> IO ()
1764+ updateStats stats isGet deliveryTime = \ case
17591765 MessageQuota {} -> pure ()
17601766 Message {msgFlags} -> do
17611767 incStat $ msgRecv stats
@@ -1772,6 +1778,8 @@ client
17721778 when (notification msgFlags) $ do
17731779 incStat $ msgRecvNtf stats
17741780 updatePeriodStats (activeQueuesNtf stats) entId
1781+ currTime <- getSystemSeconds
1782+ atomicModifyIORef'_ (msgRecvAckTimes stats) $ updateTimeBuckets deliveryTime currTime
17751783
17761784 sendMessage :: MsgFlags -> MsgBody -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg )
17771785 sendMessage msgFlags msgBody q qr
@@ -1839,33 +1847,35 @@ client
18391847 -- the subscribed client var is read outside of STM to avoid transaction cost
18401848 -- in case no client is subscribed.
18411849 getSubscribedClient rId (queueSubscribers subscribers)
1842- $>>= atomically . deliverToSub
1850+ $>>= deliverToSub
18431851 >>= mapM_ forkDeliver
18441852 where
18451853 rId = recipientId q
1846- deliverToSub rcv =
1854+ deliverToSub rcv = do
1855+ ts <- getSystemSeconds
1856+ atomically $
18471857 -- reading client TVar in the same transaction,
18481858 -- so that if subscription ends, it re-evalutates
18491859 -- and delivery is cancelled -
18501860 -- the new client will receive message in response to SUB.
1851- readTVar rcv
1852- $>>= \ rc@ Client {subscriptions = subs, sndQ = sndQ'} -> TM. lookup rId subs
1853- $>>= \ s@ Sub {subThread, delivered} -> case subThread of
1854- ProhibitSub -> pure Nothing
1855- ServerSub st -> readTVar st >>= \ case
1856- NoSub ->
1857- tryTakeTMVar delivered >>= \ case
1858- Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
1859- Nothing ->
1860- ifM
1861- (isFullTBQueue sndQ')
1862- (writeTVar st SubPending $> Just (rc, s, st))
1863- (deliver sndQ' s $> Nothing )
1864- _ -> pure Nothing
1865- deliver sndQ' s = do
1861+ readTVar rcv
1862+ $>>= \ rc@ Client {subscriptions = subs, sndQ = sndQ'} -> TM. lookup rId subs
1863+ $>>= \ s@ Sub {subThread, delivered} -> case subThread of
1864+ ProhibitSub -> pure Nothing
1865+ ServerSub st -> readTVar st >>= \ case
1866+ NoSub ->
1867+ tryTakeTMVar delivered >>= \ case
1868+ Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
1869+ Nothing ->
1870+ ifM
1871+ (isFullTBQueue sndQ')
1872+ (writeTVar st SubPending $> Just (rc, s, st))
1873+ (deliver sndQ' s ts $> Nothing )
1874+ _ -> pure Nothing
1875+ deliver sndQ' s ts = do
18661876 let encMsg = encryptMsg qr msg
18671877 writeTBQueue sndQ' [(CorrId " " , rId, MSG encMsg)]
1868- void $ setDelivered s msg
1878+ void $ setDelivered s msg ts
18691879 forkDeliver (rc@ Client {sndQ = sndQ'}, s@ Sub {delivered}, st) = do
18701880 t <- mkWeakThreadId =<< forkIO deliverThread
18711881 atomically $ modifyTVar' st $ \ case
@@ -1878,13 +1888,14 @@ client
18781888 -- lookup can be outside of STM transaction,
18791889 -- as long as the check that it is the same client is inside.
18801890 getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
1881- deliverIfSame rcv = time " deliver" . atomically $
1882- whenM (sameClient rc rcv) $
1891+ deliverIfSame rcv = time " deliver" $ do
1892+ ts <- getSystemSeconds
1893+ atomically $ whenM (sameClient rc rcv) $
18831894 tryTakeTMVar delivered >>= \ case
18841895 Just _ -> pure () -- if a message was already delivered, should not deliver more
18851896 Nothing -> do
18861897 -- a separate thread is needed because it blocks when client sndQ is full.
1887- deliver sndQ' s
1898+ deliver sndQ' s ts
18881899 writeTVar st NoSub
18891900
18901901 enqueueNotification :: NtfCreds -> Message -> M s ()
@@ -1958,13 +1969,14 @@ client
19581969 VRFailed e -> Left (corrId', entId', ERR e)
19591970
19601971 deliverMessage :: T. Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg )
1961- deliverMessage name qr rId s@ Sub {subThread} msg_ = time (name <> " deliver" ) . atomically $
1972+ deliverMessage name qr rId s@ Sub {subThread} msg_ = time (name <> " deliver" ) $
19621973 case subThread of
19631974 ProhibitSub -> pure resp
19641975 _ -> case msg_ of
1965- Just msg ->
1976+ Just msg -> do
1977+ ts <- getSystemSeconds
19661978 let encMsg = encryptMsg qr msg
1967- in setDelivered s msg $> (corrId, rId, MSG encMsg)
1979+ atomically ( setDelivered s msg ts) $> (corrId, rId, MSG encMsg)
19681980 _ -> pure resp
19691981 where
19701982 resp = (corrId, rId, OK )
@@ -1982,8 +1994,10 @@ client
19821994 msgId' = messageId msg
19831995 msgTs' = messageTs msg
19841996
1985- setDelivered :: Sub -> Message -> STM Bool
1986- setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg
1997+ setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool
1998+ setDelivered Sub {delivered} msg ! ts = do
1999+ let ! msgId = messageId msg
2000+ tryPutTMVar delivered (msgId, ts)
19872001
19882002 delQueueAndMsgs :: (StoreQueue s , QueueRec ) -> M s (Transmission BrokerMsg )
19892003 delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do
@@ -2024,7 +2038,7 @@ client
20242038 SubPending -> QSubPending
20252039 SubThread _ -> QSubThread
20262040 ProhibitSub -> pure QProhibitSub
2027- qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
2041+ qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered
20282042 pure QSub {qSubThread, qDelivered}
20292043
20302044 ok :: Transmission BrokerMsg
0 commit comments