@@ -108,8 +108,12 @@ data JournalQueue = JournalQueue
108108 msgQueue_ :: TVar (Maybe JournalMsgQueue ),
109109 -- system time in seconds since epoch
110110 activeAt :: TVar Int64 ,
111- -- Just True - empty, Just False - non-empty, Nothing - unknown
112- isEmpty :: TVar (Maybe Bool )
111+ queueState :: TVar (Maybe QState ) -- Nothing - unknown
112+ }
113+
114+ data QState = QState
115+ { hasPending :: Bool ,
116+ hasStored :: Bool
113117 }
114118
115119data JMQueue = JMQueue
@@ -152,6 +156,12 @@ data JournalState t = JournalState
152156 }
153157 deriving (Show )
154158
159+ qState :: MsgQueueState -> QState
160+ qState MsgQueueState {size, readState = rs, writeState = ws} =
161+ let hasPending = size > 0
162+ in QState {hasPending, hasStored = hasPending || msgCount rs > 0 || msgCount ws > 0 }
163+ {-# INLINE qState #-}
164+
155165data JournalType = JTRead | JTWrite
156166
157167data SJournalType (t :: JournalType ) where
@@ -224,12 +234,20 @@ newtype StoreIO a = StoreIO {unStoreIO :: IO a}
224234instance STMStoreClass JournalMsgStore where
225235 stmQueueStore JournalMsgStore {queueStore} = queueStore
226236 mkQueue st rId qr = do
227- lock <- getMapLock (queueLocks st) rId
228- q <- newTVar $ Just qr
229- mq <- newTVar Nothing
237+ queueLock <- getMapLock (queueLocks st) rId
238+ queueRec <- newTVar $ Just qr
239+ msgQueue_ <- newTVar Nothing
230240 activeAt <- newTVar 0
231- isEmpty <- newTVar Nothing
232- pure $ JournalQueue rId lock q mq activeAt isEmpty
241+ queueState <- newTVar Nothing
242+ pure $
243+ JournalQueue
244+ { recipientId = rId,
245+ queueLock,
246+ queueRec,
247+ msgQueue_,
248+ activeAt,
249+ queueState
250+ }
233251 msgQueue_' = msgQueue_
234252
235253instance MsgStoreClass JournalMsgStore where
@@ -314,7 +332,7 @@ instance MsgStoreClass JournalMsgStore where
314332 {-# INLINE queueRec' #-}
315333
316334 getMsgQueue :: JournalMsgStore -> JournalQueue -> Bool -> StoreIO JournalMsgQueue
317- getMsgQueue ms@ JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} forWrite =
335+ getMsgQueue ms@ JournalMsgStore {random} q' @ JournalQueue {recipientId = rId, msgQueue_} forWrite =
318336 StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure
319337 where
320338 newQ = do
@@ -323,6 +341,8 @@ instance MsgStoreClass JournalMsgStore where
323341 queue = JMQueue {queueDirectory = dir, statePath}
324342 q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue)
325343 atomically $ writeTVar msgQueue_ $ Just q
344+ st <- readTVarIO $ state q
345+ atomically $ writeTVar (queueState q') $ Just $! qState st
326346 pure q
327347 where
328348 createQ :: JMQueue -> IO JournalMsgQueue
@@ -333,10 +353,9 @@ instance MsgStoreClass JournalMsgStore where
333353 mkJournalQueue queue (newMsgQueueState journalId) Nothing
334354
335355 getPeekMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue , Message ))
336- getPeekMsgQueue ms q@ JournalQueue {isEmpty} =
337- StoreIO (readTVarIO isEmpty) >>= \ case
338- Just True -> pure Nothing
339- Just False -> peek
356+ getPeekMsgQueue ms q@ JournalQueue {queueState} =
357+ StoreIO (readTVarIO queueState) >>= \ case
358+ Just QState {hasPending} -> if hasPending then peek else pure Nothing
340359 Nothing -> do
341360 -- We only close the queue if we just learnt it's empty.
342361 -- This is needed to reduce file descriptors and memory usage
@@ -353,15 +372,15 @@ instance MsgStoreClass JournalMsgStore where
353372
354373 -- only runs action if queue is not empty
355374 withIdleMsgQueue :: Int64 -> JournalMsgStore -> JournalQueue -> (JournalMsgQueue -> StoreIO a ) -> StoreIO (Maybe a , Int )
356- withIdleMsgQueue now ms@ JournalMsgStore {config} q action =
375+ withIdleMsgQueue now ms@ JournalMsgStore {config} q@ JournalQueue {queueState} action =
357376 StoreIO $ readTVarIO (msgQueue_ q) >>= \ case
358377 Nothing ->
359378 E. bracket
360- (unStoreIO $ getPeekMsgQueue ms q)
379+ getNonEmptyMsgQueue
361380 (mapM_ $ \ _ -> closeMsgQueue q)
362381 (maybe (pure (Nothing , 0 )) (unStoreIO . run))
363382 where
364- run (mq, _) = do
383+ run mq = do
365384 r <- action mq
366385 sz <- getQueueSize_ mq
367386 pure (Just r, sz)
@@ -372,6 +391,19 @@ instance MsgStoreClass JournalMsgStore where
372391 else pure Nothing
373392 sz <- unStoreIO $ getQueueSize_ mq
374393 pure (r, sz)
394+ where
395+ getNonEmptyMsgQueue :: IO (Maybe JournalMsgQueue )
396+ getNonEmptyMsgQueue =
397+ readTVarIO queueState >>= \ case
398+ Just QState {hasStored}
399+ | hasStored -> Just <$> unStoreIO (getMsgQueue ms q False )
400+ | otherwise -> pure Nothing
401+ Nothing -> do
402+ mq <- unStoreIO $ getMsgQueue ms q False
403+ -- queueState was updated in getMsgQueue
404+ readTVarIO queueState >>= \ case
405+ Just QState {hasStored} | not hasStored -> closeMsgQueue q $> Nothing
406+ _ -> pure $ Just mq
375407
376408 deleteQueue :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType QueueRec )
377409 deleteQueue ms q = fst <$$> deleteQueue_ ms q
@@ -383,15 +415,15 @@ instance MsgStoreClass JournalMsgStore where
383415 where
384416 getSize = maybe (pure (- 1 )) (fmap size . readTVarIO . state)
385417
386- getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message ]
387- getQueueMessages_ drainMsgs q = StoreIO (run [] )
418+ getQueueMessages_ :: Bool -> JournalQueue -> JournalMsgQueue -> StoreIO [Message ]
419+ getQueueMessages_ drainMsgs q' q = StoreIO (run [] )
388420 where
389421 run msgs = readTVarIO (handles q) >>= maybe (pure [] ) (getMsg msgs)
390- getMsg msgs hs = chooseReadJournal q drainMsgs hs >>= maybe (pure msgs) readMsg
422+ getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
391423 where
392424 readMsg (rs, h) = do
393425 (msg, len) <- hGetMsgAt h $ bytePos rs
394- updateReadPos q drainMsgs len hs
426+ updateReadPos q' q drainMsgs len hs
395427 (msg : ) <$> run msgs
396428
397429 writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message , Bool ))
@@ -402,7 +434,6 @@ instance MsgStoreClass JournalMsgStore where
402434 let empty = size == 0
403435 if canWrite || empty
404436 then do
405- atomically $ writeTVar (isEmpty q') (Just False )
406437 let canWrt' = quota > size
407438 if canWrt'
408439 then writeToJournal q st canWrt' msg $> Just (msg, empty)
@@ -424,7 +455,7 @@ instance MsgStoreClass JournalMsgStore where
424455 rs' = if journalId ws == journalId rs then rs {msgCount = msgPos', byteCount = bytePos'} else rs
425456 ! st' = st {writeState = ws', readState = rs', canWrite = canWrt', size = size + 1 }
426457 hAppend wh (bytePos ws) msgStr
427- updateQueueState q logState hs st' $
458+ updateQueueState q' q logState hs st' $
428459 when (size == 0 ) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen))
429460 where
430461 JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q
@@ -452,25 +483,22 @@ instance MsgStoreClass JournalMsgStore where
452483
453484 tryPeekMsg_ :: JournalQueue -> JournalMsgQueue -> StoreIO (Maybe Message )
454485 tryPeekMsg_ q mq@ JournalMsgQueue {tipMsg, handles} =
455- StoreIO $ (readTVarIO handles $>>= chooseReadJournal mq True $>>= peekMsg) >>= setEmpty
486+ StoreIO $ (readTVarIO handles $>>= chooseReadJournal q mq True $>>= peekMsg)
456487 where
457488 peekMsg (rs, h) = readTVarIO tipMsg >>= maybe readMsg (pure . fmap fst )
458489 where
459490 readMsg = do
460491 ml@ (msg, _) <- hGetMsgAt h $ bytePos rs
461492 atomically $ writeTVar tipMsg $ Just (Just ml)
462493 pure $ Just msg
463- setEmpty msg = do
464- atomically $ writeTVar (isEmpty q) (Just $ isNothing msg)
465- pure msg
466494
467495 tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO ()
468496 tryDeleteMsg_ q mq@ JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
469497 void $
470498 readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
471499 $>>= (pure . fmap snd )
472500 $>>= \ len -> readTVarIO handles
473- $>>= \ hs -> updateReadPos mq logState len hs $> Just ()
501+ $>>= \ hs -> updateReadPos q mq logState len hs $> Just ()
474502
475503 isolateQueue :: JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
476504 isolateQueue JournalQueue {recipientId, queueLock} op =
@@ -562,8 +590,8 @@ mkJournalQueue queue st hs_ = do
562590 -- to avoid map lookup on queue operations
563591 pure JournalMsgQueue {queue, state, tipMsg, handles}
564592
565- chooseReadJournal :: JournalMsgQueue -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle ))
566- chooseReadJournal q log' hs = do
593+ chooseReadJournal :: JournalQueue -> JournalMsgQueue -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle ))
594+ chooseReadJournal q' q log' hs = do
567595 st@ MsgQueueState {writeState = ws, readState = rs} <- readTVarIO (state q)
568596 case writeHandle hs of
569597 Just wh | msgPos rs >= msgCount rs && journalId rs /= journalId ws -> do
@@ -573,15 +601,16 @@ chooseReadJournal q log' hs = do
573601 when log' $ removeJournal (queueDirectory $ queue q) rs
574602 let ! rs' = (newJournalState $ journalId ws) {msgCount = msgCount ws, byteCount = byteCount ws}
575603 ! st' = st {readState = rs'}
576- updateQueueState q log' hs st' $ pure ()
604+ updateQueueState q' q log' hs st' $ pure ()
577605 pure $ Just (rs', wh)
578606 _ | msgPos rs >= msgCount rs && journalId rs == journalId ws -> pure Nothing
579607 _ -> pure $ Just (rs, readHandle hs)
580608
581- updateQueueState :: JournalMsgQueue -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO ()
582- updateQueueState q log' hs st a = do
609+ updateQueueState :: JournalQueue -> JournalMsgQueue -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO ()
610+ updateQueueState q' q log' hs st a = do
583611 unless (validQueueState st) $ E. throwIO $ userError $ " updateQueueState invalid state: " <> show st
584612 when log' $ appendState (stateHandle hs) st
613+ atomically $ writeTVar (queueState q') $ Just $! qState st
585614 atomically $ writeTVar (state q) st >> a
586615
587616appendState :: Handle -> MsgQueueState -> IO ()
@@ -591,14 +620,14 @@ appendState h = E.uninterruptibleMask_ . appendState_ h
591620appendState_ :: Handle -> MsgQueueState -> IO ()
592621appendState_ h st = B. hPutStr h $ strEncode st `B.snoc` ' \n '
593622
594- updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO ()
595- updateReadPos q log' len hs = do
623+ updateReadPos :: JournalQueue -> JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO ()
624+ updateReadPos q' q log' len hs = do
596625 st@ MsgQueueState {readState = rs, size} <- readTVarIO (state q)
597626 let JournalState {msgPos, bytePos} = rs
598627 let msgPos' = msgPos + 1
599628 rs' = rs {msgPos = msgPos', bytePos = bytePos + len}
600629 st' = st {readState = rs', size = size - 1 }
601- updateQueueState q log' hs st' $ writeTVar (tipMsg q) Nothing
630+ updateQueueState q' q log' hs st' $ writeTVar (tipMsg q) Nothing
602631
603632msgQueueDirectory :: JournalMsgStore -> RecipientId -> FilePath
604633msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathParts}} rId =
0 commit comments