Skip to content

Commit 7c478a7

Browse files
committed
fix
1 parent 4e8fa40 commit 7c478a7

File tree

2 files changed

+8
-11
lines changed

2 files changed

+8
-11
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,7 +1492,7 @@ client
14921492
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
14931493
Just (q, QueueRec {notifier = Just ntfCreds}) ->
14941494
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
1495-
$ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n
1495+
$ batchSubs >>= \(_, _, ntfAssocs) -> sequence (M.lookup (recipientId q) ntfAssocs)
14961496
_ -> pure $ ERR INTERNAL
14971497
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
14981498
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
@@ -1505,11 +1505,9 @@ client
15051505
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
15061506
Cmd SRecipient command ->
15071507
case command of
1508-
SUB -> case batchSubs of
1508+
SUB -> case batchSubs >>= \(msgs, rcvAssocs, _) -> sequence (M.lookup entId rcvAssocs) $> msgs of
15091509
Left e -> pure $ Just (err e, Nothing)
1510-
Right (msgs, rcvAssocs, _) -> case sequence $ M.lookup entId rcvAssocs of
1511-
Left e -> pure $ Just (err e, Nothing)
1512-
Right _ -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
1510+
Right msgs -> withQueue' $ subscribeQueueAndDeliver $ M.lookup entId msgs
15131511
GET -> withQueue getMessage
15141512
ACK msgId -> withQueue $ acknowledgeMsg msgId
15151513
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1654,7 +1652,7 @@ client
16541652
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
16551653
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
16561654
Nothing ->
1657-
deliver =<< sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
1655+
deliver =<< sharedSubscribeQueue q rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices
16581656
Just s@Sub {subThread} -> do
16591657
stats <- asks serverStats
16601658
case subThread of
@@ -1756,23 +1754,21 @@ client
17561754

17571755
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
17581756
subscribeNotifications q NtfCreds {ntfServiceId} = do
1759-
(hasSub, _) <- sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
1757+
(hasSub, _) <- sharedSubscribeQueue q ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices
17601758
when (isNothing clntServiceId) $
17611759
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
17621760
pure $ SOK clntServiceId
17631761

17641762
sharedSubscribeQueue ::
1765-
(PartyI p, ServiceParty p) =>
17661763
StoreQueue s ->
1767-
SParty p ->
17681764
Maybe ServiceId ->
17691765
ServerSubscribers s ->
17701766
(Client s -> TMap QueueId sub) ->
17711767
(Client s -> TVar (Int64, IdsHash)) ->
17721768
STM sub ->
17731769
(ServerStats -> ServiceStats) ->
17741770
M s (Bool, Maybe sub)
1775-
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
1771+
sharedSubscribeQueue q queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
17761772
stats <- asks serverStats
17771773
let incSrvStat sel = incStat $ sel $ servicesSel stats
17781774
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ import Simplex.Messaging.SystemTime
9191
import Simplex.Messaging.TMap (TMap)
9292
import qualified Simplex.Messaging.TMap as TM
9393
import Simplex.Messaging.Transport (SMPServiceRole (..))
94-
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>))
94+
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, maybeFirstRow', tshow, (<$$>), ($>>=))
9595
import System.Exit (exitFailure)
9696
import System.IO (IOMode (..), hFlush, stdout)
9797
import UnliftIO.STM
@@ -504,6 +504,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
504504
atomically $ writeTVar (queueRec sq) $ Just q'
505505
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
506506

507+
setQueueServices :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> Maybe ServiceId -> [q] -> IO (Either ErrorType (M.Map RecipientId (Either ErrorType ())))
507508
setQueueServices _ _ _ [] = pure $ Right M.empty
508509
setQueueServices st party serviceId qs = E.uninterruptibleMask_ $ runExceptT $ do
509510
updated <- S.fromList <$> withDB' "setQueueServices" st (\db ->

0 commit comments

Comments
 (0)