Skip to content

Commit d3754b3

Browse files
authored
smp server: do not use queue cache with PostgreSQL message storage (#1637)
* smp server: do not use queue cache with PostgreSQL message storage * fix loading queues via notifier IDs
1 parent 112cd9d commit d3754b3

File tree

4 files changed

+88
-55
lines changed

4 files changed

+88
-55
lines changed

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
307307
newQueueStore = \case
308308
MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) ()
309309
#if defined(dbServerPostgres)
310-
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg
310+
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True)
311311
#endif
312312

313313
closeQueueStore = withQS (closeQueueStore @(JournalQueue s))

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ instance MsgStoreClass PostgresMsgStore where
9797

9898
newMsgStore :: PostgresMsgStoreCfg -> IO PostgresMsgStore
9999
newMsgStore config = do
100-
queueStore_ <- newQueueStore @PostgresQueue $ queueStoreCfg config
100+
queueStore_ <- newQueueStore @PostgresQueue (queueStoreCfg config, False)
101101
pure PostgresMsgStore {config, queueStore_}
102102

103103
closeMsgStore :: PostgresMsgStore -> IO ()

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 84 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,17 @@ data PostgresQueueStore q = PostgresQueueStore
109109
notifiers :: TMap NotifierId RecipientId,
110110
notifierLocks :: TMap NotifierId Lock,
111111
serviceLocks :: TMap CertFingerprint Lock,
112-
deletedTTL :: Int64
112+
deletedTTL :: Int64,
113+
useCache :: Bool
113114
}
114115

116+
type UseQueueCache = Bool
117+
115118
instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
116-
type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg
119+
type QueueStoreCfg (PostgresQueueStore q) = (PostgresStoreCfg, UseQueueCache)
117120

118-
newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q)
119-
newQueueStore PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL} = do
121+
newQueueStore :: (PostgresStoreCfg, UseQueueCache) -> IO (PostgresQueueStore q)
122+
newQueueStore (PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL}, useCache) = do
120123
dbStore <- either err pure =<< createDBStore dbOpts serverMigrations (MigrationConfig confirmMigrations Nothing)
121124
dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath
122125
queues <- TM.emptyIO
@@ -125,7 +128,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
125128
notifiers <- TM.emptyIO
126129
notifierLocks <- TM.emptyIO
127130
serviceLocks <- TM.emptyIO
128-
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL}
131+
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL, useCache}
129132
where
130133
err e = do
131134
logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e
@@ -172,28 +175,35 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
172175
void $ withDB "addQueue_" st $ \db ->
173176
E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr))
174177
>>= bimapM handleDuplicate pure
175-
atomically $ TM.insert rId sq queues
176-
atomically $ TM.insert (senderId qr) rId senders
177-
forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers
178-
forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links
178+
when useCache $ do
179+
atomically $ TM.insert rId sq queues
180+
atomically $ TM.insert (senderId qr) rId senders
181+
forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers
182+
forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links
179183
withLog "addStoreQueue" st $ \s -> logCreateQueue s rId qr
180184
pure sq
181185
where
182-
PostgresQueueStore {queues, senders, links, notifiers} = st
186+
PostgresQueueStore {queues, senders, links, notifiers, useCache} = st
183187
-- Not doing duplicate checks in maps as the probability of duplicates is very low.
184188
-- It needs to be reconsidered when IDs are supplied by the users.
185189
-- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier]
186190
-- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier
187191

188192
getQueue_ :: QueueParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
189-
getQueue_ st mkQ party qId = case party of
190-
SRecipient -> getRcvQueue qId
191-
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
192-
SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue
193-
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
194-
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
193+
getQueue_ st mkQ party qId
194+
| useCache = case party of
195+
SRecipient -> getRcvQueue qId
196+
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
197+
SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue
198+
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
199+
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
200+
| otherwise = case party of
201+
SRecipient -> loadQueueNoCache " WHERE recipient_id = ?"
202+
SSender -> loadQueueNoCache " WHERE sender_id = ?"
203+
SSenderLink -> loadQueueNoCache " WHERE link_id = ?"
204+
SNotifier -> loadQueueNoCache " WHERE notifier_id = ?"
195205
where
196-
PostgresQueueStore {queues, senders, links, notifiers} = st
206+
PostgresQueueStore {queues, senders, links, notifiers, useCache} = st
197207
getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right)
198208
loadRcvQueue = do
199209
(rId, qRec) <- loadQueue " WHERE recipient_id = ?"
@@ -210,6 +220,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
210220
liftIO $
211221
TM.lookupIO rId queues -- checking recipient map first
212222
>>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>)
223+
loadQueueNoCache cond = mask $ loadQueue cond >>= liftIO . uncurry (mkQ True)
213224
mask = E.uninterruptibleMask_ . runExceptT
214225
cacheSender rId = TM.insert qId rId senders
215226
loadQueue condition =
@@ -232,20 +243,27 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
232243
pure sq
233244

234245
getQueues_ :: forall p. BatchParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> [QueueId] -> IO [Either ErrorType q]
235-
getQueues_ st mkQ party qIds = case party of
236-
SRecipient -> do
237-
qs <- readTVarIO queues
238-
let qs' = map (\qId -> get qs qId qId) qIds
239-
E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue
240-
SNotifier -> do
241-
ns <- readTVarIO notifiers
242-
qs <- readTVarIO queues
243-
let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds
244-
E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) ->
245-
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query
246-
(nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs)
246+
getQueues_ st mkQ party qIds
247+
| null qIds = pure []
248+
| useCache = case party of
249+
SRecipient -> do
250+
qs <- readTVarIO queues
251+
let qs' = map (\qId -> get qs qId qId) qIds
252+
E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue
253+
SNotifier -> do
254+
ns <- readTVarIO notifiers
255+
qs <- readTVarIO queues
256+
let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds
257+
E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) ->
258+
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query
259+
(nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs)
260+
| otherwise = E.uninterruptibleMask_ $ case party of
261+
SRecipient -> loadQueuesNoCache " WHERE recipient_id IN ?" $ \(rId, qRec) ->
262+
Just . (rId,) <$> mkQ False rId qRec
263+
SNotifier -> loadQueuesNoCache " WHERE notifier_id IN ?" $ \(rId, qRec) ->
264+
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> (nId,) <$> mkQ False rId qRec
247265
where
248-
PostgresQueueStore {queues, notifiers} = st
266+
PostgresQueueStore {queues, notifiers, useCache} = st
249267
get :: M.Map QueueId a -> QueueId -> QueueId -> Either QueueId a
250268
get m qId = maybe (Left qId) Right . (`M.lookup` m)
251269
loadQueues :: [Either QueueId q] -> Query -> ((RecipientId, QueueRec) -> IO (Maybe (QueueId, q))) -> IO [Either ErrorType q]
@@ -254,15 +272,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
254272
if null qIds'
255273
then pure $ map (first (const INTERNAL)) qs'
256274
else do
257-
qs_ <-
258-
runExceptT $ fmap M.fromList $
259-
withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds')))
260-
>>= liftIO . fmap catMaybes . mapM (mkCacheQueue . rowToQueueRec)
275+
qs_ <- dbLoadQueues qIds' cond mkCacheQueue
261276
pure $ map (result qs_) qs'
262277
where
263278
result :: Either ErrorType (M.Map QueueId q) -> Either QueueId q -> Either ErrorType q
264279
result _ (Right q) = Right q
265280
result qs_ (Left qId) = maybe (Left AUTH) Right . M.lookup qId =<< qs_
281+
dbLoadQueues qIds' cond mkQueue' =
282+
runExceptT $ fmap M.fromList $
283+
withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds')))
284+
>>= liftIO . fmap catMaybes . mapM (mkQueue' . rowToQueueRec)
266285
cacheRcvQueue (rId, qRec) = do
267286
sq <- mkQ True rId qRec
268287
sq' <- withQueueLock sq "getQueue_" $ atomically $
@@ -271,6 +290,12 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
271290
Just sq' -> pure sq'
272291
Nothing -> sq <$ TM.insert rId sq queues
273292
pure $ Just (rId, sq')
293+
loadQueuesNoCache cond mkQueue' = do
294+
qs_ <- dbLoadQueues qIds cond mkQueue'
295+
pure $ map (result qs_) qIds
296+
where
297+
result :: Either ErrorType (M.Map QueueId q) -> QueueId -> Either ErrorType q
298+
result qs_ qId = maybe (Left AUTH) Right . M.lookup qId =<< qs_
274299

275300
getQueueLinkData :: PostgresQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData)
276301
getQueueLinkData st sq lnkId = runExceptT $ do
@@ -336,19 +361,23 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
336361
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
337362
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
338363
withQueueRec sq "addQueueNotifier" $ \q ->
339-
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
340-
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do
341-
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
342-
E.try (update db) >>= bimapM handleDuplicate pure
343-
nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc
344-
let !q' = q {notifier = Just ntfCreds}
345-
atomically $ writeTVar (queueRec sq) $ Just q'
346-
-- cache queue notifier ID – after notifier is added ntf server will likely subscribe
364+
checkCachedNotifier $ do
365+
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
366+
E.try (update db) >>= bimapM handleDuplicate pure
367+
nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc
368+
let !q' = q {notifier = Just ntfCreds}
369+
atomically $ writeTVar (queueRec sq) $ Just q'
370+
when useCache $ do
347371
atomically $ TM.insert nId rId notifiers
348-
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
349-
pure nc_
372+
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
373+
pure nc_
350374
where
351-
PostgresQueueStore {notifiers} = st
375+
checkCachedNotifier add
376+
| useCache =
377+
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
378+
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT add
379+
| otherwise = add
380+
PostgresQueueStore {notifiers, useCache} = st
352381
rId = recipientId sq
353382
update db =
354383
DB.execute
@@ -364,13 +393,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
364393
deleteQueueNotifier st sq =
365394
withQueueRec sq "deleteQueueNotifier" $ \q ->
366395
ExceptT $ fmap sequence $ forM (notifier q) $ \nc@NtfCreds {notifierId = nId} ->
367-
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do
396+
withNotifierLock nId $ runExceptT $ do
368397
assertUpdated $ withDB' "deleteQueueNotifier" st update
369-
atomically $ TM.delete nId $ notifiers st
398+
when (useCache st) $ atomically $ TM.delete nId $ notifiers st
370399
atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing}
371400
withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
372401
pure nc
373402
where
403+
withNotifierLock nId
404+
| useCache st = withLockMap (notifierLocks st) nId "deleteQueueNotifier"
405+
| otherwise = id
374406
rId = recipientId sq
375407
update db =
376408
DB.execute
@@ -420,10 +452,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
420452
assertUpdated $ withDB' "deleteStoreQueue" st $ \db ->
421453
DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId)
422454
atomically $ writeTVar qr Nothing
423-
atomically $ TM.delete (senderId q) $ senders st
424-
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
425-
atomically $ TM.delete notifierId $ notifiers st
426-
atomically $ TM.delete notifierId $ notifierLocks st
455+
when (useCache st) $ do
456+
atomically $ TM.delete (senderId q) $ senders st
457+
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
458+
atomically $ TM.delete notifierId $ notifiers st
459+
atomically $ TM.delete notifierId $ notifierLocks st
427460
withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
428461
pure q
429462
where

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,11 +1539,11 @@ testOldContactQueueShortLink ps@(_, msType) = withAgentClients2 $ \a b -> do
15391539
ASSCfg _ _ SSCMemoryJournal {storeLogFile} -> updateStoreLog storeLogFile
15401540
#if defined(dbServerPostgres)
15411541
ASSCfg _ _ SSCDatabaseJournal {storeCfg} -> do
1542-
st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg
1542+
st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) (storeCfg, True)
15431543
updateDbStore st
15441544
closeQueueStore @(JournalQueue 'QSPostgres) st
15451545
ASSCfg _ _ (SSCDatabase storeCfg) -> do
1546-
st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue storeCfg
1546+
st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue (storeCfg, False)
15471547
updateDbStore st
15481548
closeQueueStore @PostgresQueue st
15491549
#else

0 commit comments

Comments
 (0)