Skip to content

Commit 2fe2076

Browse files
committed
service delivery WIP
1 parent c63d93d commit 2fe2076

File tree

6 files changed

+25
-22
lines changed

6 files changed

+25
-22
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,7 +1357,6 @@ forkClient Client {endThreads, endThreadSeq} label action = do
13571357

13581358
client :: forall s. MsgStoreClass s => Server s -> s -> Client s -> M s ()
13591359
client
1360-
-- TODO [certs rcv] rcv subscriptions
13611360
Server {subscribers, ntfSubscribers}
13621361
ms
13631362
clnt@Client {clientId, rcvQ, sndQ, msgQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
@@ -1805,23 +1804,27 @@ client
18051804
pure $ SOKS count idsHash
18061805
where
18071806
deliverServiceMessages expectedCnt = do
1808-
(qCnt, _msgCnt, _dupCnt, _errCnt) <- foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, 0)
1809-
atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, ALLS)]
1810-
-- TODO [certs rcv] compare with expected
1811-
logNote $ "Service subscriptions for " <> tshow serviceId <> " (" <> tshow qCnt <> " queues)"
1812-
deliverQueueMsg :: (Int, Int, Int, Int) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int, Int, Int, Int)
1813-
deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, !errCnt) rId = \case
1814-
Left e -> pure (qCnt + 1, msgCnt, dupCnt, errCnt + 1) -- TODO [certs rcv] deliver subscription error
1807+
foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, []) >>= \case
1808+
Right (qCnt, _msgCnt, _dupCnt, _errCnt) -> do
1809+
atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, ALLS)]
1810+
-- TODO [certs rcv] compare with expected
1811+
logNote $ "Service subscriptions for " <> tshow serviceId <> " (" <> tshow qCnt <> " queues)"
1812+
Left e -> do
1813+
-- TODO [certs rcv] deliver SMP error
1814+
logError $ "Service subscriptions for " <> tshow serviceId <> " error: " <> tshow e
1815+
deliverQueueMsg :: (Int, Int, Int, [ErrorType]) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int, Int, Int, [ErrorType])
1816+
deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, !errs) rId = \case
1817+
Left e -> pure (qCnt + 1, msgCnt, dupCnt, e : errs) -- TODO [certs rcv] deliver subscription error
18151818
Right qMsg_ -> case qMsg_ of
1816-
Nothing -> pure (qCnt + 1, msgCnt, dupCnt, errCnt)
1819+
Nothing -> pure (qCnt + 1, msgCnt, dupCnt, errs)
18171820
Just (qr, msg) ->
18181821
atomically (getSubscription rId) >>= \case
1819-
Nothing -> pure (qCnt + 1, msgCnt, dupCnt + 1, errCnt)
1822+
Nothing -> pure (qCnt + 1, msgCnt, dupCnt + 1, errs)
18201823
Just sub -> do
18211824
ts <- getSystemSeconds
18221825
atomically $ setDelivered sub msg ts
18231826
atomically $ writeTBQueue msgQ [(NoCorrId, rId, MSG (encryptMsg qr msg))]
1824-
pure (qCnt + 1, msgCnt + 1, dupCnt, errCnt)
1827+
pure (qCnt + 1, msgCnt + 1, dupCnt, errs)
18251828
getSubscription rId =
18261829
TM.lookup rId (subscriptions clnt) >>= \case
18271830
-- If delivery subscription already exists, then there is no need to deliver message.

src/Simplex/Messaging/Server/MsgStore/Journal.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,9 @@ instance MsgStoreClass (JournalMsgStore s) where
444444
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
445445
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
446446

447-
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
447+
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
448448
foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of
449-
MQStore st -> foldRcvServiceQueues st serviceId f' acc
449+
MQStore st -> fmap Right $ foldRcvServiceQueues st serviceId f' acc
450450
where
451451
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
452452
#if defined(dbServerPostgres)

src/Simplex/Messaging/Server/MsgStore/Postgres.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ instance MsgStoreClass PostgresMsgStore where
119119
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
120120
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}
121121

122-
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
122+
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
123123
foldRcvServiceMessages ms serviceId f acc =
124-
withTransaction (dbStore $ queueStore_ ms) $ \db ->
124+
runExceptT $ withDB' "foldRcvServiceMessages" (queueStore_ ms) $ \db ->
125125
DB.fold
126126
db
127127
[sql|

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ instance MsgStoreClass STMMsgStore where
8787
expireOldMessages _tty ms now ttl =
8888
withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl)
8989

90-
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
91-
foldRcvServiceMessages ms serviceId f=
92-
foldRcvServiceQueues (queueStore_ ms) serviceId $ \a (q, qr) ->
93-
runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
90+
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
91+
foldRcvServiceMessages ms serviceId f = fmap Right . foldRcvServiceQueues (queueStore_ ms) serviceId f'
92+
where
93+
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
9494

9595
logQueueStates _ = pure ()
9696
{-# INLINE logQueueStates #-}

src/Simplex/Messaging/Server/MsgStore/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
4545
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
4646
-- tty, store, now, ttl
4747
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
48-
foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
48+
foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO (Either ErrorType a)
4949
logQueueStates :: s -> IO ()
5050
logQueueState :: StoreQueue s -> StoreMonad s ()
5151
queueStore :: s -> QueueStore s

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -581,9 +581,9 @@ foldServiceRecs st f =
581581
DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $
582582
\ !acc -> fmap (acc <>) . f . rowToServiceRec
583583

584-
foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO a
584+
foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO (Either ErrorType a)
585585
foldRcvServiceQueueRecs st serviceId f acc =
586-
withTransaction (dbStore st) $ \db ->
586+
runExceptT $ withDB' "foldRcvServiceQueueRecs" st $ \db ->
587587
DB.fold db (queueRecQuery <> " WHERE rcv_service_id = ? AND deleted_at IS NULL") (Only serviceId) acc $ \a -> f a . rowToQueueRec
588588

589589
foldQueueRecs :: Monoid a => Bool -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a

0 commit comments

Comments
 (0)