Skip to content

Commit 9cfdae3

Browse files
authored
smp server: uniterruptible mask for DB operations (#1635)
1 parent 6218369 commit 9cfdae3

File tree

1 file changed

+27
-18
lines changed

1 file changed

+27
-18
lines changed

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Simplex.Messaging.Server.MsgStore.Postgres
2727
where
2828

2929
import Control.Concurrent.STM
30+
import qualified Control.Exception as E
3031
import Control.Monad
3132
import Control.Monad.Reader
3233
import Control.Monad.Trans.Except
@@ -161,15 +162,16 @@ instance MsgStoreClass PostgresMsgStore where
161162

162163
writeMsg :: PostgresMsgStore -> PostgresQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
163164
writeMsg ms q _ msg =
164-
withDB' "writeMsg" (queueStore_ ms) $ \db -> do
165-
let (msgQuota, ntf, body) = case msg of
166-
Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body')
167-
MessageQuota {} -> (True, False, B.empty)
168-
toResult <$>
169-
DB.query
170-
db
171-
"SELECT quota_written, was_empty FROM write_message(?,?,?,?,?,?,?)"
172-
(recipientId' q, Binary (messageId msg), systemSeconds (messageTs msg), msgQuota, ntf, Binary body, quota)
165+
uninterruptibleMask_ $
166+
withDB' "writeMsg" (queueStore_ ms) $ \db -> do
167+
let (msgQuota, ntf, body) = case msg of
168+
Message {msgFlags = MsgFlags ntf', msgBody = C.MaxLenBS body'} -> (False, ntf', body')
169+
MessageQuota {} -> (True, False, B.empty)
170+
toResult <$>
171+
DB.query
172+
db
173+
"SELECT quota_written, was_empty FROM write_message(?,?,?,?,?,?,?)"
174+
(recipientId' q, Binary (messageId msg), systemSeconds (messageTs msg), msgQuota, ntf, Binary body, quota)
173175
where
174176
toResult = \case
175177
((msgQuota, wasEmpty) : _) -> if msgQuota then Nothing else Just (msg, wasEmpty)
@@ -206,7 +208,7 @@ instance MsgStoreClass PostgresMsgStore where
206208
tryDeleteMsg_ _q _ _ = error "tryDeleteMsg_ not used" -- do
207209

208210
isolateQueue :: PostgresMsgStore -> PostgresQueue -> Text -> DBStoreIO a -> ExceptT ErrorType IO a
209-
isolateQueue ms _q op a = withDB' op (queueStore_ ms) $ runReaderT a . DBTransaction
211+
isolateQueue ms _q op a = uninterruptibleMask_ $ withDB' op (queueStore_ ms) $ runReaderT a . DBTransaction
210212

211213
unsafeRunStore _ _ _ = error "unsafeRunStore not used"
212214

@@ -216,15 +218,17 @@ instance MsgStoreClass PostgresMsgStore where
216218

217219
tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
218220
tryDelMsg ms q msgId =
219-
withDB' "tryDelMsg" (queueStore_ ms) $ \db ->
220-
maybeFirstRow toMessage $
221-
DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_msg(?, ?)" (recipientId' q, Binary msgId)
221+
uninterruptibleMask_ $
222+
withDB' "tryDelMsg" (queueStore_ ms) $ \db ->
223+
maybeFirstRow toMessage $
224+
DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_msg(?, ?)" (recipientId' q, Binary msgId)
222225

223226
tryDelPeekMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
224227
tryDelPeekMsg ms q msgId =
225-
withDB' "tryDelPeekMsg" (queueStore_ ms) $ \db ->
226-
toResult . map toMessage
227-
<$> DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_peek_msg(?, ?)" (recipientId' q, Binary msgId)
228+
uninterruptibleMask_ $
229+
withDB' "tryDelPeekMsg" (queueStore_ ms) $ \db ->
230+
toResult . map toMessage
231+
<$> DB.query db "SELECT r_msg_id, r_msg_ts, r_msg_quota, r_msg_ntf_flag, r_msg_body FROM try_del_peek_msg(?, ?)" (recipientId' q, Binary msgId)
228232
where
229233
toResult = \case
230234
[] -> (Nothing, Nothing)
@@ -235,8 +239,13 @@ instance MsgStoreClass PostgresMsgStore where
235239

236240
deleteExpiredMsgs :: PostgresMsgStore -> PostgresQueue -> Int64 -> ExceptT ErrorType IO Int
237241
deleteExpiredMsgs ms q old =
238-
maybeFirstRow' 0 (fromIntegral @Int64 . fromOnly) $ withDB' "deleteExpiredMsgs" (queueStore_ ms) $ \db ->
239-
DB.query db "SELECT delete_expired_msgs(?, ?)" (recipientId' q, old)
242+
uninterruptibleMask_ $
243+
maybeFirstRow' 0 (fromIntegral @Int64 . fromOnly) $ withDB' "deleteExpiredMsgs" (queueStore_ ms) $ \db ->
244+
DB.query db "SELECT delete_expired_msgs(?, ?)" (recipientId' q, old)
245+
246+
uninterruptibleMask_ :: ExceptT ErrorType IO a -> ExceptT ErrorType IO a
247+
uninterruptibleMask_ = ExceptT . E.uninterruptibleMask_ . runExceptT
248+
{-# INLINE uninterruptibleMask_ #-}
240249

241250
toMessage :: (Binary MsgId, Int64, Bool, Bool, Binary MsgBody) -> Message
242251
toMessage (Binary msgId, ts, msgQuota, ntf, Binary body)

0 commit comments

Comments
 (0)